Skip to content

Commit

Permalink
feat: add mongo repo
Browse files Browse the repository at this point in the history
  • Loading branch information
hunjixin committed Jul 29, 2024
1 parent cc66e32 commit 75866bc
Show file tree
Hide file tree
Showing 11 changed files with 172 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use unit::UnitDataStream;
name = "compute_unit_runner",
version = "0.0.1",
author = "Author Name <github.com/GitDataAI/jz-action>",
about = "embed in k8s images"
about = "embed in k8s images. work for process data input and output"
)]
struct Args {
#[arg(short, long, default_value = "INFO")]
Expand Down
2 changes: 1 addition & 1 deletion crates/dp_runner/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use unit::UnitDataStream;
name = "dp_runner",
version = "0.0.1",
author = "Author Name <github.com/GitDataAI/jz-action>",
about = "embed in k8s images"
about = "embed in k8s images. make process data input output"
)]
struct Args {
#[arg(short, long, default_value = "INFO")]
Expand Down
2 changes: 1 addition & 1 deletion makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ $(OUTPUT):

################### build crates
build-cd: $(OUTPUT)
cargo build -p compute_unit_runner --release
cargo build -p compute_unit_runner --release --bin compute_unit_runner
cp target/release/compute_unit_runner $(OUTPUT)/compute_unit_runner

build-dp: $(OUTPUT)
Expand Down
2 changes: 1 addition & 1 deletion nodes/jz_reader/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use tracing::{info, Level};
name = "jz_reader",
version = "0.0.1",
author = "Author Name <github.com/GitDataAI/jz-action>",
about = "embed in k8s images"
about = "embed in k8s images. "
)]

struct Args {
Expand Down
6 changes: 6 additions & 0 deletions src/core/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,9 @@ pub trait NodeRepo {
async fn insert_node(&self, state: Node) -> Result<()>;
async fn get_node_by_name(&self, name: &str) -> Result<Node>;
}

pub trait DbRepo = GraphRepo + NodeRepo + Send + Sync + 'static;

pub trait DBConfig {
fn connection_string(&self) -> &str;
}
2 changes: 1 addition & 1 deletion src/dag/dag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use anyhow::{anyhow, Ok, Result};
use std::collections::HashMap;

pub struct Dag {
name: String,
pub name: String,
nodes: HashMap<String, ComputeUnit>,
/// Store dependency relations.
rely_graph: Graph,
Expand Down
29 changes: 23 additions & 6 deletions src/dbrepo/mongo.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,38 @@
use crate::{
core::models::{Graph, GraphRepo, Node, NodeRepo},
core::models::{DBConfig, Graph, GraphRepo, Node, NodeRepo},
utils::StdIntoAnyhowResult,
};
use anyhow::{anyhow, Result};
use mongodb::{bson::doc, options::IndexOptions, Client, Collection, IndexModel};
use std::{ops::Deref, sync::Arc};
use serde::Serialize;
use std::{marker::PhantomData, ops::Deref, sync::Arc};
use tracing::error;

const GRAPH_COL_NAME: &'static str = "graph";
const NODE_COL_NAME: &'static str = "node";

#[derive(Clone)]
pub struct MongoRepo {
graph_col: Collection<Graph>,
node_col: Collection<Node>,
}

#[derive(Clone, Serialize)]
pub struct MongoConfig {
pub mongo_url: String,
}

impl DBConfig for MongoConfig {
fn connection_string(&self) -> &str {
return &self.mongo_url;
}
}

impl MongoRepo {
pub async fn new(mongo_url: &str, db_name: &str) -> Result<Self> {
let client = Client::with_uri_str(mongo_url).await?;
pub async fn new<DBC>(config: DBC, db_name: &str) -> Result<Self>
where DBC: DBConfig
{
let client = Client::with_uri_str(config.connection_string()).await?;
let database = client.database(db_name);
let graph_col: Collection<Graph> = database.collection(&GRAPH_COL_NAME);
let node_col: Collection<Node> = database.collection(&NODE_COL_NAME);
Expand All @@ -41,7 +56,8 @@ impl MongoRepo {
}
}

impl GraphRepo for MongoRepo {
impl GraphRepo for MongoRepo
{
async fn insert_global_state(&self, state: Graph) -> Result<()> {
self.graph_col.insert_one(state).await.map(|_| ()).anyhow()
}
Expand All @@ -55,7 +71,8 @@ impl GraphRepo for MongoRepo {
}
}

impl NodeRepo for MongoRepo {
impl NodeRepo for MongoRepo
{
async fn insert_node(&self, state: Node) -> Result<()> {
self.node_col.insert_one(state).await.map(|_| ()).anyhow()
}
Expand Down
Loading

0 comments on commit 75866bc

Please sign in to comment.