Skip to content

Commit

Permalink
Surface errors from python runners (#1764)
Browse files Browse the repository at this point in the history
  • Loading branch information
DatGuyJonathan authored Sep 23, 2024
1 parent ad89204 commit d76cb50
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 55 deletions.
34 changes: 21 additions & 13 deletions apps/framework-cli/src/cli/routines/consumption.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,28 @@
use std::{fs, io::Write};

use crate::{
cli::display::Message, framework::typescript::templates::BASE_CONSUMPTION_TEMPLATE,
cli::display::Message,
framework::{
python::templates::PYTHON_BASE_CONSUMPTION_TEMPLATE,
typescript::templates::TS_BASE_CONSUMPTION_TEMPLATE,
},
project::Project,
};

use crate::framework::languages::SupportedLanguages;

use super::{RoutineFailure, RoutineSuccess};

pub fn create_consumption_file(
project: &Project,
filename: String,
) -> Result<RoutineSuccess, RoutineFailure> {
let apis_dir = project.consumption_dir();
let apis_file_path = apis_dir.join(format!("{}.ts", filename));
let apis_file_path = apis_dir.join(format!("{}.{}", filename, project.language.extension()));
let template = match project.language {
SupportedLanguages::Typescript => TS_BASE_CONSUMPTION_TEMPLATE,
SupportedLanguages::Python => PYTHON_BASE_CONSUMPTION_TEMPLATE,
};

let mut apis_file = fs::File::create(&apis_file_path).map_err(|err| {
RoutineFailure::new(
Expand All @@ -24,17 +34,15 @@ pub fn create_consumption_file(
)
})?;

apis_file
.write_all(BASE_CONSUMPTION_TEMPLATE.as_bytes())
.map_err(|err| {
RoutineFailure::new(
Message::new(
"Failed".to_string(),
format!("to write to consumption file {}", apis_file_path.display()),
),
err,
)
})?;
apis_file.write_all(template.as_bytes()).map_err(|err| {
RoutineFailure::new(
Message::new(
"Failed".to_string(),
format!("to write to consumption file {}", apis_file_path.display()),
),
err,
)
})?;

Ok(RoutineSuccess::success(Message::new(
"Created".to_string(),
Expand Down
10 changes: 6 additions & 4 deletions apps/framework-cli/src/framework/python/scripts/blocks_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import os
import sys

from moose_lib import cli_log, CliLogData


parser = argparse.ArgumentParser(description='Run blocks')
parser.add_argument('blocks_dir_path', type=str,
Expand Down Expand Up @@ -68,7 +70,7 @@ def walk_dir(dir, file_extension):

for root, dirs, files in os.walk(dir):
for file in files:
if file.endswith(file_extension):
if file.endswith(file_extension) and file != '__init__.py':
file_list.append(os.path.join(root, file))

return file_list
Expand All @@ -82,7 +84,7 @@ def create_blocks(ch_client, path):
print(f"Creating block using query {query}")
ch_client.command(query)
except Exception as err:
print(f"Failed to create blocks: {err}")
cli_log(CliLogData(action="Blocks", message=f"Failed to create blocks: {err}", message_type="Error"))


def delete_blocks(ch_client, path):
Expand All @@ -93,7 +95,7 @@ def delete_blocks(ch_client, path):
print(f"Deleting block using query {query}")
ch_client.command(query)
except Exception as err:
print(f"Failed to delete blocks: {err}")
cli_log(CliLogData(action="Blocks", message=f"Failed to delete blocks: {err}", message_type="Error"))


async def async_worker(task):
Expand All @@ -116,7 +118,7 @@ async def main():
get_blocks_from_file(file)
block_files.append(file)
except Exception as err:
print(f"Skipping {file} due to error: {err}")
cli_log(CliLogData(action="Blocks", message=f"Failed to import blocks from {file}: {err}", message_type="Error"))

print(f"Found {len(block_files)} blocks in {blocks_dir_path}")
print(f"Blocks: {block_files}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import threading
import time

from moose_lib import cli_log, CliLogData

class EnhancedJSONEncoder(json.JSONEncoder):
def default(self, o):
if isinstance(o, datetime):
Expand Down Expand Up @@ -92,14 +94,16 @@ def get_max_message_size(config_json: str) -> int:
flow = import_module(function_file_name, package=function_file_dir)
flow_def = flow.StreamingFunction
except Exception as e:
error(f"Error importing streaming function: {e} in file {function_file_name}")
cli_log(CliLogData(action="Function", message=str(e), message_type="Error"))
sys.exit(1)

# Get all the named flows in the flow file and make sure the flow is of type Flow
flows = [f for f in dir(flow) if isinstance(getattr(flow, f), flow_def)]

# Make sure that there is only one flow in the file
if len(flows) != 1:
error(f"Expected one streaming function in the file, but got {len(flows)}")
cli_log(CliLogData(action="Function", message=f"Expected one streaming function in the file, but got {len(flows)}", message_type="Error"))
sys.exit(1)

# Get the dataclass that's the input to the flow run function

Expand Down Expand Up @@ -133,7 +137,7 @@ def parse_input(json_input):
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
else:
print("No sasl mechanism specified. Using default consumer.")
log("No sasl mechanism specified. Using default consumer.")
consumer = KafkaConsumer(
source_topic,
client_id= "python_flow_consumer",
Expand Down Expand Up @@ -186,28 +190,27 @@ def send_message_metrics_in():

# This is batched under-the-hood
for message in consumer:
# Parse the message into the input type
input_data = parse_input(message.value)

# Run the flow
output_data = flow_run(input_data)
try:
# Parse the message into the input type
input_data = parse_input(message.value)

# Handle flow function returning an array or a single object
output_data_list = output_data if isinstance(output_data, list) else [output_data]
# Run the flow
output_data = flow_run(input_data)

count_in += len(output_data_list)
# Handle flow function returning an array or a single object
output_data_list = output_data if isinstance(output_data, list) else [output_data]


requests.post("http://localhost:5000/logs", json={"message_type": "Success", 'action': 'Received',
'message': f'{source_topic} -> {target_topic} {len(output_data_list)} message(s)'})


for item in output_data_list:
# Ignore flow function returning null
if item is not None:
# send() is asynchronous. When called it adds the record to a buffer of pending record sends
# and immediately returns. This allows the producer to batch together individual records
bytes_count += len(json.dumps(item, cls=EnhancedJSONEncoder).encode('utf-8'))
producer.send(target_topic, json.dumps(item, cls=EnhancedJSONEncoder).encode('utf-8'))
count_out+=1
count_in += len(output_data_list)
cli_log(CliLogData(action="Received", message=f'{source_topic} -> {target_topic} {len(output_data_list)} message(s)'))

for item in output_data_list:
# Ignore flow function returning null
if item is not None:
# send() is asynchronous. When called it adds the record to a buffer of pending record sends
# and immediately returns. This allows the producer to batch together individual records
bytes_count += len(json.dumps(item, cls=EnhancedJSONEncoder).encode('utf-8'))
producer.send(target_topic, json.dumps(item, cls=EnhancedJSONEncoder).encode('utf-8'))
count_out += 1
except Exception as e:
cli_log(CliLogData(action="Function", message=str(e), message_type="Error"))
10 changes: 10 additions & 0 deletions apps/framework-cli/src/framework/python/templates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,16 @@ my_function = StreamingFunction(
)
"#;

pub static PYTHON_BASE_CONSUMPTION_TEMPLATE: &str = r#"
# This file is where you can define your API templates for consuming your data
# All query_params are passed in as strings, and are used within the sql tag to parameterize you queries
from moose_lib import MooseClient
def run(client: MooseClient, params):
return client.query("SELECT 1", { })
"#;

pub static PYTHON_BASE_API_SAMPLE: &str = r#"
from moose_lib import MooseClient
Expand Down
4 changes: 2 additions & 2 deletions apps/framework-cli/src/framework/typescript/templates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ export default {
} as Blocks;
"#;

pub static BASE_CONSUMPTION_TEMPLATE: &str = r#"
pub static TS_BASE_CONSUMPTION_TEMPLATE: &str = r#"
import { ConsumptionUtil } from "@514labs/moose-lib";
// This file is where you can define your API templates for consuming your data
Expand All @@ -150,7 +150,7 @@ export default async function handle(
{ client, sql }: ConsumptionUtil
) {
return client.query(sql``);
return client.query(sql`SELECT 1`);
}
"#;

Expand Down
36 changes: 26 additions & 10 deletions packages/ts-moose-lib/src/blocks/runner.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import process from "node:process";
import { ClickHouseClient } from "@clickhouse/client-web";
import fastq, { queueAsPromised } from "fastq";
import { getClickhouseClient, walkDir } from "../commons";
import { cliLog, getClickhouseClient, walkDir } from "../commons";
import { Blocks } from "./helpers";


interface BlocksQueueTask {
chClient: ClickHouseClient;
blocks: Blocks;
Expand Down Expand Up @@ -45,7 +46,11 @@ const createBlocks = async (chClient: ClickHouseClient, blocks: Blocks) => {
console.log(`Creating block using query ${query}`);
await chClient.command({ query });
} catch (err) {
console.error(`Failed to create a block: ${err}`);
cliLog({
action: "Blocks",
message: `Failed to create blocks: ${err}`,
message_type: "Error",
});
if (err && JSON.stringify(err).includes(`UNKNOWN_TABLE`)) {
throw new DependencyError(err.toString());
}
Expand All @@ -59,7 +64,11 @@ const deleteBlocks = async (chClient: ClickHouseClient, blocks: Blocks) => {
console.log(`Deleting block using query ${query}`);
await chClient.command({ query });
} catch (err) {
console.error(`Failed to delete block: ${err}`);
cliLog({
action: "Blocks",
message: `Failed to delete blocks: ${err}`,
message_type: "Error",
});
}
}
};
Expand Down Expand Up @@ -90,13 +99,20 @@ export const runBlocks = async () => {
for (const path of blocksFiles) {
console.log(`Adding to queue: ${path}`);

const blocks = require(path).default as Blocks;

queue.push({
chClient,
blocks,
retries: numOfBlockFiles,
});
try {
const blocks = require(path).default as Blocks;
queue.push({
chClient,
blocks,
retries: numOfBlockFiles,
});
} catch (err) {
cliLog({
action: "Blocks",
message: `Failed to import blocks from ${path}: ${err}`,
message_type: "Error",
});
}
}

while (!queue.idle()) {
Expand Down
2 changes: 1 addition & 1 deletion packages/ts-moose-lib/src/streaming-functions/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ const startConsumer = async (
args.functionFilePath.substring(0, args.functionFilePath.length - 3),
);
} catch (e) {
cliLog({ action: "Code failed to load", message: `${e}` });
cliLog({ action: "Function", message: `${e}`, message_type: "Error" });
throw e;
}
const streamingFunction: StreamingFunction = streamingFunctionImport.default;
Expand Down

0 comments on commit d76cb50

Please sign in to comment.