Skip to content

Commit

Permalink
chores: better fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
hunjixin committed Aug 3, 2024
1 parent eb2351d commit 0d709ea
Show file tree
Hide file tree
Showing 23 changed files with 562 additions and 184 deletions.
52 changes: 40 additions & 12 deletions crates/compute_unit_runner/src/bin/compute_unit_runner.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,46 @@
use compute_unit_runner::fs_cache::{FSCache, FileCache, MemCache};
use compute_unit_runner::{ipc, media_data_tracker};
use jz_action::dbrepo::mongo::{MongoConfig, MongoRepo};
use jz_action::utils::StdIntoAnyhowResult;
use compute_unit_runner::{
fs_cache::{
FSCache,
FileCache,
MemCache,
},
ipc,
media_data_tracker,
};
use jz_action::{
dbrepo::mongo::{
MongoConfig,
MongoRepo,
},
utils::StdIntoAnyhowResult,
};

use anyhow::{anyhow, Result};
use anyhow::{
anyhow,
Result,
};
use clap::Parser;
use media_data_tracker::MediaDataTracker;
use std::str::FromStr;
use std::sync::Arc;
use tokio::select;
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::mpsc;
use tokio::sync::Mutex;
use tracing::{error, info, Level};
use std::{
str::FromStr,
sync::Arc,
};
use tokio::{
select,
signal::unix::{
signal,
SignalKind,
},
sync::{
mpsc,
Mutex,
},
};
use tracing::{
error,
info,
Level,
};

#[derive(Debug, Parser)]
#[command(
Expand Down
32 changes: 25 additions & 7 deletions crates/compute_unit_runner/src/fs_cache.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,36 @@
use anyhow::{anyhow, Result};
use anyhow::{
anyhow,
Result,
};
use async_trait::async_trait;
use jz_action::{
network::datatransfer::{MediaDataBatchResponse, MediaDataCell},
utils::{IntoAnyhowResult, StdIntoAnyhowResult},
network::datatransfer::{
MediaDataBatchResponse,
MediaDataCell,
},
utils::{
IntoAnyhowResult,
StdIntoAnyhowResult,
},
};
use std::{
collections::HashMap,
path::{Path, PathBuf},
path::{
Path,
PathBuf,
},
sync::Arc,
time::Instant,
};
use tokio::fs;
use tokio::sync::Mutex;
use tracing::{debug, error, info};
use tokio::{
fs,
sync::Mutex,
};
use tracing::{
debug,
error,
info,
};
use walkdir::WalkDir;

#[async_trait]
Expand Down
68 changes: 51 additions & 17 deletions crates/compute_unit_runner/src/ipc.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,60 @@
use crate::media_data_tracker::MediaDataTracker;
use actix_web::http::StatusCode;
use actix_web::web::Data;
use actix_web::{error, middleware, web, App, HttpRequest, HttpResponse, HttpServer};
use anyhow::anyhow;
use anyhow::Result;
use actix_web::{
error,
http::StatusCode,
middleware,
web,
web::Data,
App,
HttpRequest,
HttpResponse,
HttpServer,
};
use anyhow::{
anyhow,
Result,
};
use core::str;
use http_body_util::Collected;
use jz_action::core::models::{DbRepo, TrackerState};
use jz_action::utils::StdIntoAnyhowResult;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::oneshot;
use tokio::sync::Mutex;
use tokio::time::sleep;
use jz_action::{
core::models::{
DbRepo,
TrackerState,
},
utils::StdIntoAnyhowResult,
};
use serde::{
Deserialize,
Serialize,
};
use std::{
sync::Arc,
time::Duration,
};
use tokio::{
sync::{
oneshot,
Mutex,
},
time::sleep,
};
use tracing::info;

use http_body_util::{BodyExt, Full};
use hyper::body::Bytes;
use hyper::{Method, Request};
use http_body_util::{
BodyExt,
Full,
};
use hyper::{
body::Bytes,
Method,
Request,
};
use hyper_util::client::legacy::Client;
use hyperlocal::{UnixClientExt, UnixConnector, Uri};
use hyperlocal::{
UnixClientExt,
UnixConnector,
Uri,
};

#[derive(Debug, Serialize, Deserialize)]
pub struct AvaiableDataResponse {
Expand Down
70 changes: 54 additions & 16 deletions crates/compute_unit_runner/src/media_data_tracker.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,60 @@
use crate::fs_cache::FileCache;
use crate::ipc::{AvaiableDataResponse, CompleteDataReq, SubmitOuputDataReq};
use anyhow::{anyhow, Ok, Result};
use jz_action::core::models::{DataRecord, DataState, DbRepo, Direction, NodeRepo, TrackerState};
use jz_action::network::common::Empty;
use jz_action::network::datatransfer::data_stream_client::DataStreamClient;
use std::sync::Arc;
use std::time::Duration;
use crate::{
fs_cache::FileCache,
ipc::{
AvaiableDataResponse,
CompleteDataReq,
SubmitOuputDataReq,
},
};
use anyhow::{
anyhow,
Ok,
Result,
};
use jz_action::{
core::models::{
DataRecord,
DataState,
DbRepo,
Direction,
NodeRepo,
TrackerState,
},
network::{
common::Empty,
datatransfer::data_stream_client::DataStreamClient,
},
};
use std::{
sync::Arc,
time::Duration,
};
use tokio::sync::mpsc::error::TrySendError;
use tonic::transport::Channel;
use tonic::Code;
use tracing::{debug, error, info, warn};
use tonic::{
transport::Channel,
Code,
};
use tracing::{
debug,
error,
info,
warn,
};

use crate::multi_sender::MultiSender;
use tokio::select;
use tokio::sync::mpsc;
use tokio::sync::Mutex;
use tokio::sync::oneshot;
use tokio::time::{self, sleep, Instant};
use tokio::{
select,
sync::{
mpsc,
oneshot,
Mutex,
},
time::{
self,
sleep,
Instant,
},
};
use tokio_stream::StreamExt;

pub struct MediaDataTracker<R>
Expand Down
26 changes: 19 additions & 7 deletions crates/compute_unit_runner/src/mprc.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use rand::Rng;
use std::hash::Hash;
use std::{borrow::Borrow, vec};
use std::{
borrow::Borrow,
hash::Hash,
vec,
};

pub(crate) struct Mprs<K, T>
where
Expand Down Expand Up @@ -66,11 +69,20 @@ where
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::time::sleep;
use std::{
sync::{
atomic::{
AtomicUsize,
Ordering,
},
Arc,
},
time::Duration,
};
use tokio::{
sync::mpsc,
time::sleep,
};

#[tokio::test]
async fn test_mprs_insert_remove() {
Expand Down
8 changes: 6 additions & 2 deletions crates/compute_unit_runner/src/multi_sender.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use jz_action::network::datatransfer::{
data_stream_client::DataStreamClient, MediaDataBatchResponse,
data_stream_client::DataStreamClient,
MediaDataBatchResponse,
};
use tokio::time::Instant;
use tonic::transport::Channel;
use tracing::{debug, error};
use tracing::{
debug,
error,
};

pub struct MultiSender {
streams: Vec<String>,
Expand Down
47 changes: 37 additions & 10 deletions crates/dp_runner/src/channel_tracker.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,41 @@
use anyhow::{anyhow, Result};
use anyhow::{
anyhow,
Result,
};
use compute_unit_runner::fs_cache::FileCache;
use jz_action::core::models::{DataRecord, DataState, DbRepo, Direction, TrackerState};
use jz_action::network::datatransfer::MediaDataBatchResponse;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::Sender;
use tokio::sync::{mpsc, oneshot, Mutex};
use tokio::time::{self, Instant};
use tokio::select;
use tracing::{debug, error, info, warn};
use jz_action::{
core::models::{
DataRecord,
DataState,
DbRepo,
Direction,
TrackerState,
},
network::datatransfer::MediaDataBatchResponse,
};
use std::{
sync::Arc,
time::Duration,
};
use tokio::{
select,
sync::{
mpsc,
mpsc::Sender,
oneshot,
Mutex,
},
time::{
self,
Instant,
},
};
use tracing::{
debug,
error,
info,
warn,
};

pub struct ChannelTracker<R>
where
Expand Down
Loading

0 comments on commit 0d709ea

Please sign in to comment.