Skip to content

Commit

Permalink
chore: change execution model for ts node processes (#1758)
Browse files Browse the repository at this point in the history
  • Loading branch information
callicles authored Sep 20, 2024
1 parent 48909b5 commit 8840be6
Show file tree
Hide file tree
Showing 38 changed files with 813 additions and 345 deletions.
2 changes: 1 addition & 1 deletion apps/framework-cli/src/cli/local_webserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::framework::data_model::config::EndpointIngestionFormat;
use crate::infrastructure::stream::redpanda;
use crate::infrastructure::stream::redpanda::ConfiguredProducer;

use crate::framework::typescript::ts_node::CliMessage;
use crate::framework::typescript::bin::CliMessage;
use crate::project::Project;
use bytes::Buf;
use http_body_util::BodyExt;
Expand Down
18 changes: 14 additions & 4 deletions apps/framework-cli/src/cli/routines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,8 +369,10 @@ pub async fn start_development_mode(

let topics = fetch_topics(&project.redpanda_config).await?;

let mut function_process_registry =
FunctionProcessRegistry::new(project.redpanda_config.clone());
let mut function_process_registry = FunctionProcessRegistry::new(
project.redpanda_config.clone(),
project.project_location.clone(),
);
// Once the below function is optimized to act on events, this
// will need to get refactored out.

Expand All @@ -385,6 +387,7 @@ pub async fn start_development_mode(
let mut blocks_process_registry = AggregationProcessRegistry::new(
project.language,
project.blocks_dir(),
project.project_location.clone(),
project.clickhouse_config.clone(),
false,
);
Expand All @@ -393,6 +396,7 @@ pub async fn start_development_mode(
let mut aggregations_process_registry = AggregationProcessRegistry::new(
project.language,
project.aggregations_dir(),
project.project_location.clone(),
project.clickhouse_config.clone(),
true,
);
Expand All @@ -402,6 +406,7 @@ pub async fn start_development_mode(
project.language,
project.clickhouse_config.clone(),
project.consumption_dir(),
project.project_location.clone(),
);
process_consumption_changes(
&project,
Expand Down Expand Up @@ -544,8 +549,10 @@ pub async fn start_production_mode(
.start_all(&framework_object_versions, &version_syncs, metrics.clone())
.await;

let mut function_process_registry =
FunctionProcessRegistry::new(project.redpanda_config.clone());
let mut function_process_registry = FunctionProcessRegistry::new(
project.redpanda_config.clone(),
project.project_location.clone(),
);
// Once the below function is optimized to act on events, this
// will need to get refactored out.
process_streaming_func_changes(
Expand All @@ -558,13 +565,15 @@ pub async fn start_production_mode(
let mut blocks_process_registry = AggregationProcessRegistry::new(
project.language,
project.blocks_dir(),
project.project_location.clone(),
project.clickhouse_config.clone(),
false,
);
process_aggregations_changes(&mut blocks_process_registry).await?;
let mut aggregations_process_registry = AggregationProcessRegistry::new(
project.language,
project.aggregations_dir(),
project.project_location.clone(),
project.clickhouse_config.clone(),
true,
);
Expand All @@ -574,6 +583,7 @@ pub async fn start_production_mode(
project.language,
project.clickhouse_config.clone(),
project.consumption_dir(),
project.project_location.clone(),
);
process_consumption_changes(
&project,
Expand Down
1 change: 1 addition & 0 deletions apps/framework-cli/src/framework/core/code_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ pub async fn get_framework_objects_from_schema_file(

let data_models_configs = data_model::config::get(
path,
&project.project_location,
framework_objects
.enums
.iter()
Expand Down
1 change: 1 addition & 0 deletions apps/framework-cli/src/framework/core/primitive_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ impl PrimitiveMap {

let data_models_configs = data_model::config::get(
path,
&project.project_location,
file_objects
.enums
.iter()
Expand Down
3 changes: 2 additions & 1 deletion apps/framework-cli/src/framework/data_model/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,11 @@ async fn parse_python_model_file(

pub async fn get(
path: &Path,
project_path: &Path,
enums: HashSet<&str>,
) -> Result<HashMap<ConfigIdentifier, DataModelConfig>, ModelConfigurationError> {
if path.extension() == Some(OsStr::new("ts")) {
let config = get_data_model_configs(path, enums).await?;
let config = get_data_model_configs(path, project_path, enums).await?;
info!("Data Model configuration for {:?}: {:?}", path, config);
Ok(config)
} else if path.extension() == Some(OsStr::new("py"))
Expand Down
2 changes: 1 addition & 1 deletion apps/framework-cli/src/framework/typescript.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
pub mod aggregation;
pub mod bin;
pub mod consumption;
pub mod export_collectors;
pub mod generator;
pub mod parser;
pub mod streaming;
pub mod templates;
pub mod ts_node;
11 changes: 6 additions & 5 deletions apps/framework-cli/src/framework/typescript/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,18 @@ use tokio::process::Child;
use crate::framework::aggregations::model::AggregationError;
use crate::infrastructure::olap::clickhouse::config::ClickHouseConfig;

use super::ts_node;
use super::bin;

const AGGREGATION_RUNNER_WRAPPER: &str = include_str!("ts_scripts/aggregation.ts");
const BLOCKS_RUNNER_WRAPPER: &str = include_str!("ts_scripts/blocks.ts");
const BLOCKS_RUNNER_BIN: &str = "blocks";
const AGGREGATIONS_RUNNER_BIN: &str = "aggregations";

// TODO: Abstract away ClickhouseConfig to support other databases
// TODO: Bubble up compilation errors to the user
pub fn run(
clickhouse_config: ClickHouseConfig,
aggregations_path: &Path,
is_blocks: bool,
project_path: &Path,
) -> Result<Child, AggregationError> {
let host_port = clickhouse_config.host_port.to_string();
let use_ssl = clickhouse_config.use_ssl.to_string();
Expand All @@ -31,9 +32,9 @@ pub fn run(
];

let mut aggregation_process = if is_blocks {
ts_node::run(BLOCKS_RUNNER_WRAPPER, &args)?
bin::run(BLOCKS_RUNNER_BIN, project_path, &args)?
} else {
ts_node::run(AGGREGATION_RUNNER_WRAPPER, &args)?
bin::run(AGGREGATIONS_RUNNER_BIN, project_path, &args)?
};

let stdout = aggregation_process
Expand Down
35 changes: 35 additions & 0 deletions apps/framework-cli/src/framework/typescript/bin.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use crate::{cli::display::MessageType, utilities::constants::TSCONFIG_JSON};
use serde::Deserialize;
use std::{path::Path, process::Stdio};

use tokio::process::{Child, Command};

#[derive(Deserialize)]
pub struct CliMessage {
pub message_type: MessageType,
pub action: String,
pub message: String,
}

const RUNNER_COMMAND: &str = "moose-runner";

pub fn run(
binary_command: &str,
project_path: &Path,
args: &[&str],
) -> Result<Child, std::io::Error> {
let mut command = Command::new(RUNNER_COMMAND);

command.arg(binary_command);

command.env("TS_NODE_PROJECT", project_path.join(TSCONFIG_JSON));

for arg in args {
command.arg(arg);
}

command
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
}
7 changes: 4 additions & 3 deletions apps/framework-cli/src/framework/typescript/consumption.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@ use tokio::process::Child;
use crate::infrastructure::olap::clickhouse::config::ClickHouseConfig;
use crate::infrastructure::processes::consumption_registry::ConsumptionError;

use super::ts_node;
use super::bin;

const CONSUMPTION_RUNNER_WRAPPER: &str = include_str!("ts_scripts/consumption-api.ts");
const CONSUMPTION_RUNNER_BIN: &str = "consumption-apis";

// TODO: Abstract away ClickhouseConfig to support other databases
// TODO: Bubble up compilation errors to the user
pub fn run(
clickhouse_config: ClickHouseConfig,
consumption_path: &Path,
project_path: &Path,
) -> Result<Child, ConsumptionError> {
let host_port = clickhouse_config.host_port.to_string();
let use_ssl = clickhouse_config.use_ssl.to_string();
Expand All @@ -28,7 +29,7 @@ pub fn run(
&use_ssl,
];

let mut consumption_process = ts_node::run(CONSUMPTION_RUNNER_WRAPPER, &args)?;
let mut consumption_process = bin::run(CONSUMPTION_RUNNER_BIN, project_path, &args)?;

let stdout = consumption_process
.stdout
Expand Down
11 changes: 6 additions & 5 deletions apps/framework-cli/src/framework/typescript/export_collectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ use std::path::Path;
use serde_json::Value;
use tokio::io::AsyncReadExt;

use super::ts_node::run;
use super::bin;
use crate::framework::data_model::config::{ConfigIdentifier, DataModelConfig};

const MODULE_EXPORT_SERIALIZER: &str = include_str!("ts_scripts/moduleExportSerializer.ts");
const EXPORT_SERIALIZER_BIN: &str = "export-serializer";

#[derive(Debug, thiserror::Error)]
#[error("Failed to run code")]
Expand All @@ -21,13 +21,13 @@ pub enum ExportCollectorError {
},
}

async fn collect_exports(file: &Path) -> Result<Value, ExportCollectorError> {
async fn collect_exports(file: &Path, project_path: &Path) -> Result<Value, ExportCollectorError> {
let file_path_str = file.to_str().ok_or(ExportCollectorError::Other {
message: "Did not get a proper file path to load exports from".to_string(),
})?;

let args = vec![file_path_str];
let process = run(MODULE_EXPORT_SERIALIZER, &args)?;
let process = bin::run(EXPORT_SERIALIZER_BIN, project_path, &args)?;

let mut stdout = process
.stdout
Expand Down Expand Up @@ -57,9 +57,10 @@ async fn collect_exports(file: &Path) -> Result<Value, ExportCollectorError> {

pub async fn get_data_model_configs(
file: &Path,
project_path: &Path,
enums: HashSet<&str>,
) -> Result<HashMap<ConfigIdentifier, DataModelConfig>, ExportCollectorError> {
let exports = collect_exports(file).await?;
let exports = collect_exports(file, project_path).await?;

match exports {
Value::Object(map) => {
Expand Down
37 changes: 25 additions & 12 deletions apps/framework-cli/src/framework/typescript/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@ pub fn extract_data_model_from_file(
let internal = project.internal_dir().unwrap();
let output_dir = internal.join("serialized_datamodels");

log::info!("Extracting data model from file: {:?}", path);

fs::write(
internal.join(TSCONFIG_JSON),
json!({
"compilerOptions":{
"outDir": "dist", // relative path, so .moose/dist
"plugins": [{
"transform": "../node_modules/@514labs/moose-lib/dist/toDataModels.js"
"transform": "../node_modules/@514labs/moose-lib/dist/dataModels/toDataModels.js"
}],
"strict":true
},
Expand All @@ -60,15 +62,25 @@ pub fn extract_data_model_from_file(
}
},
)?;
let ts_return_code = Command::new("npx")
.arg("tspc")

let ts_return_code = Command::new("moose-tspc")
.arg("--project")
.arg(format!(".moose/{}", TSCONFIG_JSON))
.env("NPM_CONFIG_UPDATE_NOTIFIER", "false")
.current_dir(&project.project_location)
.spawn()?
.spawn()
.map_err(|err| {
log::error!("Error while starting moose-tspc: {}", err);
TypescriptParsingError::TypescriptCompilerError(Some(err))
})?
.wait()
.map_err(|err| TypescriptParsingError::TypescriptCompilerError(Some(err)))?;
.map_err(|err| {
log::error!("Error while running moose-tspc: {}", err);
TypescriptParsingError::TypescriptCompilerError(Some(err))
})?;

log::info!("Typescript compiler return code: {:?}", ts_return_code);

if !ts_return_code.success() {
return Err(TypescriptParsingError::TypescriptCompilerError(None));
}
Expand All @@ -87,6 +99,7 @@ pub fn extract_data_model_from_file(

let mut output_json = serde_json::from_slice::<Value>(&output)
.map_err(|_| TypescriptParsingError::TypescriptCompilerError(None))?;

if let Some(error_type) = output_json.get("error_type") {
if let Some(error_type) = error_type.as_str() {
if error_type == "unknown_type" {
Expand Down Expand Up @@ -169,18 +182,18 @@ mod tests {
.wait()
.unwrap();

Command::new("rm")
.arg("-rf")
.arg("./tests/test_project/node_modules/@514labs/moose-lib/dist/")
Command::new("npm")
.arg("link")
.current_dir("../../packages/ts-moose-lib")
.spawn()
.unwrap()
.wait()
.unwrap();

Command::new("cp")
.arg("-r")
.arg("../../packages/ts-moose-lib/dist/")
.arg("./tests/test_project/node_modules/@514labs/moose-lib/dist/")
Command::new("npm")
.arg("link")
.arg("@514labs/moose-lib")
.current_dir("./tests/test_project")
.spawn()
.unwrap()
.wait()
Expand Down
7 changes: 4 additions & 3 deletions apps/framework-cli/src/framework/typescript/streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ use tokio::process::Child;

use crate::infrastructure::stream::redpanda::RedpandaConfig;

use super::ts_node;
use super::bin;

const FUNCTION_RUNNER_WRAPPER: &str = include_str!("ts_scripts/streaming-function.ts");
const FUNCTION_RUNNER_BIN: &str = "streaming-functions";

// TODO: we currently refer redpanda configuration here. If we want to be able to
// abstract this to other type of streaming engine, we will need to be able to abstract this away.
Expand All @@ -17,6 +17,7 @@ pub fn run(
target_topic: &str,
target_topic_config: &str,
streaming_function_file: &Path,
project_path: &Path,
// TODO Remove the anyhow type here
) -> Result<Child, std::io::Error> {
let mut args = vec![
Expand Down Expand Up @@ -48,7 +49,7 @@ pub fn run(
args.push(redpanda_config.security_protocol.as_ref().unwrap());
}

let mut streaming_function_process = ts_node::run(FUNCTION_RUNNER_WRAPPER, &args)?;
let mut streaming_function_process = bin::run(FUNCTION_RUNNER_BIN, project_path, &args)?;

let stdout = streaming_function_process
.stdout
Expand Down
Loading

0 comments on commit 8840be6

Please sign in to comment.