Skip to content

Commit

Permalink
versioned tables, version sync, and rewritten watcher logic (#469)
Browse files Browse the repository at this point in the history
  • Loading branch information
phiSgr authored Feb 27, 2024
1 parent 9261d7a commit 276dbc1
Show file tree
Hide file tree
Showing 19 changed files with 832 additions and 384 deletions.
9 changes: 5 additions & 4 deletions apps/framework-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions apps/framework-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ lazy_static = "1.4.0"
anyhow = "1.0"
spinners = "4.1.1"
git2 = { version = "0.18.1", features = ["vendored-libgit2"] }
regex = "1.10.3"

[dev-dependencies]
clickhouse = { version = "0.11.5", features = ["uuid", "test-util"] }
Expand Down
16 changes: 15 additions & 1 deletion apps/framework-cli/src/cli/local_webserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ struct RouteService {
route_table: &'static RwLock<HashMap<PathBuf, RouteMeta>>,
configured_producer: ConfiguredProducer,
console_config: ConsoleConfig,
current_version: String,
}

impl Service<Request<Incoming>> for RouteService {
Expand All @@ -77,6 +78,7 @@ impl Service<Request<Incoming>> for RouteService {
fn call(&self, req: Request<Incoming>) -> Self::Future {
Box::pin(router(
req,
self.current_version.clone(),
self.route_table,
self.configured_producer.clone(),
self.console_config.clone(),
Expand Down Expand Up @@ -189,6 +191,7 @@ async fn ingest_route(

async fn router(
req: Request<hyper::body::Incoming>,
current_version: String,
route_table: &RwLock<HashMap<PathBuf, RouteMeta>>,
configured_producer: ConfiguredProducer,
console_config: ConsoleConfig,
Expand All @@ -211,9 +214,19 @@ async fn router(
);

let route_split = route.to_str().unwrap().split('/').collect::<Vec<&str>>();

match (req.method(), &route_split[..]) {
(&hyper::Method::POST, ["ingest", _]) => {
ingest_route(
req,
// without explicit version, go to current project version
route.join(current_version),
configured_producer,
route_table,
console_config,
)
.await
}
(&hyper::Method::POST, ["ingest", _, _]) => {
ingest_route(req, route, configured_producer, route_table, console_config).await
}

Expand Down Expand Up @@ -283,6 +296,7 @@ impl Webserver {

let route_service = RouteService {
route_table,
current_version: project.version().to_string(),
configured_producer: producer,
console_config: project.console_config.clone(),
};
Expand Down
143 changes: 102 additions & 41 deletions apps/framework-cli/src/cli/routines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,27 +80,28 @@
//!

use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;
use std::{io::Error, path::PathBuf};

use log::debug;
use log::info;
use tokio::sync::RwLock;

use super::display::with_spinner_async;
use super::local_webserver::Webserver;
use super::watcher::FileWatcher;
use super::{Message, MessageType};
use crate::framework::controller::RouteMeta;
use crate::framework::schema::process_schema_file;
use crate::framework::controller::{
create_or_replace_version_sync, get_all_framework_objects, get_all_version_syncs,
process_objects, FrameworkObject, FrameworkObjectVersions, RouteMeta, SchemaVersion,
};
use crate::framework::sdks::generate_ts_sdk;
use crate::infrastructure::console::post_current_state_to_console;
use crate::infrastructure::olap;
use crate::infrastructure::olap::clickhouse::ConfiguredDBClient;
use crate::infrastructure::stream::redpanda;
use crate::project::Project;
use log::info;
use crate::utilities::package_managers;

use async_recursion::async_recursion;
use super::display::with_spinner_async;
use super::local_webserver::Webserver;
use super::watcher::FileWatcher;
use super::{Message, MessageType};

pub mod clean;
pub mod initialize;
Expand Down Expand Up @@ -237,7 +238,8 @@ pub async fn start_development_mode(project: Arc<Project>) -> anyhow::Result<()>
let mut route_table = HashMap::<PathBuf, RouteMeta>::new();

info!("Initializing project state");
initialize_project_state(project.clone(), &mut route_table).await?;
let framework_object_versions =
initialize_project_state(project.clone(), &mut route_table).await?;
let route_table: &'static RwLock<HashMap<PathBuf, RouteMeta>> =
Box::leak(Box::new(RwLock::new(route_table)));

Expand All @@ -246,7 +248,7 @@ pub async fn start_development_mode(project: Arc<Project>) -> anyhow::Result<()>
let web_server = Webserver::new(server_config.host.clone(), server_config.port);
let file_watcher = FileWatcher::new();

file_watcher.start(project.clone(), route_table)?;
file_watcher.start(project.clone(), framework_object_versions, route_table)?;

info!("Starting web server...");

Expand All @@ -258,31 +260,101 @@ pub async fn start_development_mode(project: Arc<Project>) -> anyhow::Result<()>
async fn initialize_project_state(
project: Arc<Project>,
route_table: &mut HashMap<PathBuf, RouteMeta>,
) -> anyhow::Result<()> {
) -> anyhow::Result<FrameworkObjectVersions> {
let mut old_version_dir = project.internal_dir()?;
old_version_dir.push("versions");

let configured_client = olap::clickhouse::create_client(project.clickhouse_config.clone());
let producer = redpanda::create_producer(project.redpanda_config.clone());

info!("Checking for old version directories...");

let mut framework_object_versions =
FrameworkObjectVersions::new(project.version().to_string(), project.schemas_dir().clone());
match std::fs::read_dir(&old_version_dir) {
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
debug!("No old version directories found");
}
Ok(read_dir) => {
for entry in read_dir {
let entry = entry?;
let path = entry.path();
if path.is_dir() {
let version = path.file_name().unwrap().to_str().unwrap().to_string();

debug!("Processing old version directory: {:?}", path);

let mut framework_objects = HashMap::new();
get_all_framework_objects(&mut framework_objects, &path, &version)?;

let mut compilable_objects = HashMap::new();
process_objects(
&framework_objects,
project.clone(),
&path,
&configured_client,
&mut compilable_objects,
route_table,
&version,
)
.await?;

framework_object_versions.previous_version_models.insert(
version,
SchemaVersion {
base_path: path,
models: framework_objects,
typescript_objects: compilable_objects,
},
);
}
}
}
Err(e) => Err(e)?,
};

let schema_dir = project.schemas_dir();
info!("Starting schema directory crawl...");

with_spinner_async("Processing schema file", async {
let crawl_result = process_schemas_in_dir(
schema_dir.as_path(),
let mut framework_objects: HashMap<String, FrameworkObject> = HashMap::new();
get_all_framework_objects(&mut framework_objects, &schema_dir, project.version())?;

let mut compilable_objects = HashMap::new();

let result = process_objects(
&framework_objects,
project.clone(),
&schema_dir,
&configured_client,
&mut compilable_objects,
route_table,
project.version(),
)
.await;

// TODO: add old versions to SDK
let sdk_location = generate_ts_sdk(project.clone(), &compilable_objects)?;
let package_manager = package_managers::PackageManager::Npm;
package_managers::install_packages(&sdk_location, &package_manager)?;
package_managers::run_build(&sdk_location, &package_manager)?;
package_managers::link_sdk(&sdk_location, None, &package_manager)?;

framework_object_versions.current_models = SchemaVersion {
base_path: schema_dir.clone(),
models: framework_objects.clone(),
typescript_objects: compilable_objects,
};

olap::clickhouse::check_ready(&configured_client).await?;
let _ = post_current_state_to_console(
project,
project.clone(),
&configured_client,
&producer,
route_table.clone(),
&framework_object_versions,
)
.await;

match crawl_result {
match result {
Ok(_) => {
info!("Schema directory crawl completed successfully");
Ok(())
Expand All @@ -295,29 +367,18 @@ async fn initialize_project_state(
}
})
.await?;
Ok(())
}

#[async_recursion]
async fn process_schemas_in_dir(
schema_dir: &Path,
project: Arc<Project>,
configured_client: &ConfiguredDBClient,
route_table: &mut HashMap<PathBuf, RouteMeta>,
) -> anyhow::Result<()> {
if schema_dir.is_dir() {
for entry in std::fs::read_dir(schema_dir)? {
let entry = entry?;
let path = entry.path();
if path.is_dir() {
debug!("Processing directory: {:?}", path);
process_schemas_in_dir(&path, project.clone(), configured_client, route_table)
.await?;
} else {
debug!("Processing file: {:?}", path);
process_schema_file(&path, project.clone(), configured_client, route_table).await?
info!("Crawling version syncs");
with_spinner_async::<_, anyhow::Result<()>>("Setting up version syncs", {
async {
let version_syncs = get_all_version_syncs(&project, &framework_object_versions)?;
for vs in version_syncs {
create_or_replace_version_sync(vs, &configured_client).await?;
}
Ok(())
}
}
Ok(())
})
.await?;

Ok(framework_object_versions)
}
Loading

0 comments on commit 276dbc1

Please sign in to comment.