Skip to content

Commit

Permalink
feat: working ts jwt implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
callicles authored Sep 25, 2024
1 parent 59160e4 commit 0af2a9a
Show file tree
Hide file tree
Showing 10 changed files with 174 additions and 51 deletions.
16 changes: 13 additions & 3 deletions apps/framework-cli/src/cli/local_webserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ async fn create_client(
.path()
.strip_prefix("/consumption/")
.unwrap_or(cleaned_path);

if !consumption_apis.contains(consumption_name) {
if !is_prod {
println!(
Expand Down Expand Up @@ -194,12 +195,18 @@ async fn create_client(

let authority = url.authority().unwrap().clone();

let req = Request::builder()
let mut new_req: Request<Full<Bytes>> = Request::builder()
.uri(cleaned_path)
.header(hyper::header::HOST, authority.as_str())
.body(Full::new(Bytes::new()))?;

let res = sender.send_request(req).await?;
let headers = new_req.headers_mut();
for (key, value) in req.headers() {
headers.insert(key, value.clone());
}

let res = sender.send_request(new_req).await?;
let status = res.status();
let body = res.collect().await.unwrap().to_bytes().to_vec();
metrics
.send_metric(MetricsMessage::PutConsumedBytesCount {
Expand All @@ -210,7 +217,7 @@ async fn create_client(
.await;

Ok(Response::builder()
.status(StatusCode::OK)
.status(status)
.header("Access-Control-Allow-Origin", "*")
.header("Access-Control-Allow-Method", "GET, POST")
.header("Access-Control-Allow-Headers", "Content-Type")
Expand Down Expand Up @@ -596,6 +603,9 @@ fn get_env_var(s: &str) -> Option<String> {
}
}

// TODO should we move this to the project config?
// Since it automatically loads the env var and orverrides local file settings
//That way, a user can set dev variables easily and override them in prod with env vars.
lazy_static! {
static ref MOOSE_CONSUMPTION_API_KEY: Option<String> = get_env_var("MOOSE_CONSUMPTION_API_KEY");
static ref MOOSE_INGEST_API_KEY: Option<String> = get_env_var("MOOSE_INGEST_API_KEY");
Expand Down
2 changes: 2 additions & 0 deletions apps/framework-cli/src/cli/routines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@ pub async fn start_development_mode(
let mut consumption_process_registry = ConsumptionProcessRegistry::new(
project.language,
project.clickhouse_config.clone(),
project.jwt.clone(),
project.consumption_dir(),
project.project_location.clone(),
);
Expand Down Expand Up @@ -582,6 +583,7 @@ pub async fn start_production_mode(
let mut consumption_process_registry = ConsumptionProcessRegistry::new(
project.language,
project.clickhouse_config.clone(),
project.jwt.clone(),
project.consumption_dir(),
project.project_location.clone(),
);
Expand Down
27 changes: 27 additions & 0 deletions apps/framework-cli/src/framework/typescript/consumption.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use tokio::process::Child;

use crate::infrastructure::olap::clickhouse::config::ClickHouseConfig;
use crate::infrastructure::processes::consumption_registry::ConsumptionError;
use crate::project::JwtConfig;

use super::bin;

Expand All @@ -14,11 +15,33 @@ const CONSUMPTION_RUNNER_BIN: &str = "consumption-apis";
// TODO: Bubble up compilation errors to the user
pub fn run(
clickhouse_config: ClickHouseConfig,
jwt_config: Option<JwtConfig>,
consumption_path: &Path,
project_path: &Path,
) -> Result<Child, ConsumptionError> {
let host_port = clickhouse_config.host_port.to_string();
let use_ssl = clickhouse_config.use_ssl.to_string();

let jwt_secret = jwt_config
.as_ref()
.map(|jwt| jwt.secret.clone())
.unwrap_or("".to_string());

let jwt_issuer = jwt_config
.as_ref()
.map(|jwt| jwt.issuer.clone())
.unwrap_or("".to_string());

let jwt_audience = jwt_config
.as_ref()
.map(|jwt| jwt.audience.clone())
.unwrap_or("".to_string());

let enforce_on_all_consumptions_apis = jwt_config
.as_ref()
.map(|jwt| jwt.enforce_on_all_consumptions_apis.to_string())
.unwrap_or("false".to_string());

let args = vec![
consumption_path.to_str().unwrap(),
&clickhouse_config.db_name,
Expand All @@ -27,6 +50,10 @@ pub fn run(
&clickhouse_config.user,
&clickhouse_config.password,
&use_ssl,
&jwt_secret,
&jwt_issuer,
&jwt_audience,
&enforce_on_all_consumptions_apis,
];

let mut consumption_process = bin::run(CONSUMPTION_RUNNER_BIN, project_path, &args)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use tokio::process::Child;
use crate::{
framework::{languages::SupportedLanguages, python, typescript},
infrastructure::olap::clickhouse::config::ClickHouseConfig,
project::JwtConfig,
utilities::system::{kill_child, KillProcessError},
};

Expand All @@ -25,12 +26,14 @@ pub struct ConsumptionProcessRegistry {
dir: PathBuf,
language: SupportedLanguages,
project_path: PathBuf,
jwt_config: Option<JwtConfig>,
}

impl ConsumptionProcessRegistry {
pub fn new(
language: SupportedLanguages,
clickhouse_config: ClickHouseConfig,
jwt_config: Option<JwtConfig>,
dir: PathBuf,
project_path: PathBuf,
) -> Self {
Expand All @@ -40,6 +43,7 @@ impl ConsumptionProcessRegistry {
dir,
clickhouse_config,
project_path,
jwt_config,
}
}

Expand All @@ -52,6 +56,7 @@ impl ConsumptionProcessRegistry {
}
SupportedLanguages::Typescript => typescript::consumption::run(
self.clickhouse_config.clone(),
self.jwt_config.clone(),
&self.dir,
&self.project_path,
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ impl ProcessRegistries {
let consumption = ConsumptionProcessRegistry::new(
project.language,
project.clickhouse_config.clone(),
project.jwt.clone(),
project.consumption_dir(),
project.project_location.clone(),
);
Expand Down
12 changes: 12 additions & 0 deletions apps/framework-cli/src/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,17 @@ pub struct Project {

#[serde(default = "HashMap::new")]
pub supported_old_versions: HashMap<String, String>,
#[serde(default)]
pub jwt: Option<JwtConfig>,
}

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct JwtConfig {
#[serde(default)]
pub enforce_on_all_consumptions_apis: bool,
pub secret: String,
pub issuer: String,
pub audience: String,
}

pub struct AggregationSet {
Expand Down Expand Up @@ -169,6 +180,7 @@ impl Project {
language_project_config,
supported_old_versions: HashMap::new(),
git_config: GitConfig::default(),
jwt: None,
}
}

Expand Down
3 changes: 2 additions & 1 deletion packages/ts-moose-lib/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
"kafkajs": "2.2.4",
"ts-patch": "~3.2.0",
"ts-node": "10.9.2",
"typescript": "~5.4.0"
"typescript": "~5.4.0",
"jose": "5.9.2"
},
"devDependencies": {
"@repo/ts-config": "workspace:*",
Expand Down
146 changes: 99 additions & 47 deletions packages/ts-moose-lib/src/consumption-apis/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import http from "http";
import process from "node:process";
import { getClickhouseClient } from "../commons";
import { MooseClient, sql } from "./helpers";
import * as jose from "jose";

export const antiCachePath = (path: string) =>
`${path}?num=${Math.random().toString()}&time=${Date.now()}`;
Expand All @@ -17,6 +18,10 @@ const [
CLICKHOUSE_USERNAME,
CLICKHOUSE_PASSWORD,
CLICKHOUSE_USE_SSL,
JWT_SECRET, // Optional we will need to bring a proper cli parsing tool to help to make sure this is more resilient. or make it one json object
JWT_ISSUER, // Optional
JWT_AUDIENCE, // Optional
ENFORCE_ON_ALL_CONSUMPTIONS_APIS, // Optional
] = process.argv;

const clickhouseConfig = {
Expand All @@ -30,64 +35,111 @@ const clickhouseConfig = {

const createPath = (path: string) => `${CONSUMPTION_DIR_PATH}${path}.ts`;

const apiHandler = async (
req: http.IncomingMessage,
res: http.ServerResponse,
) => {
try {
const url = new URL(req.url || "", "https://localhost");
const fileName = url.pathname;

const pathName = createPath(fileName);

const paramsObject = Array.from(url.searchParams.entries()).reduce(
(obj: { [key: string]: any }, [key, value]) => {
if (obj[key]) {
if (Array.isArray(obj[key])) {
obj[key].push(value);
} else {
obj[key] = [obj[key], value];
const apiHandler =
(publicKey: jose.KeyLike | undefined) =>
async (req: http.IncomingMessage, res: http.ServerResponse) => {
try {
const url = new URL(req.url || "", "https://localhost");
const fileName = url.pathname;

let jwtPayload;
if (publicKey && JWT_ISSUER && JWT_AUDIENCE) {
const jwt = req.headers.authorization?.split(" ")[1]; // Bearer <token>
if (jwt) {
try {
const { payload } = await jose.jwtVerify(jwt, publicKey, {
issuer: JWT_ISSUER,
audience: JWT_AUDIENCE,
});
jwtPayload = payload;
} catch (error) {
console.log("JWT verification failed");
if (ENFORCE_ON_ALL_CONSUMPTIONS_APIS === "true") {
res.writeHead(401, { "Content-Type": "application/json" });
res.end(JSON.stringify({ error: "Unauthorized" }));
return;
}
}
} else {
obj[key] = value;
} else if (ENFORCE_ON_ALL_CONSUMPTIONS_APIS === "true") {
res.writeHead(401, { "Content-Type": "application/json" });
res.end(JSON.stringify({ error: "Unauthorized" }));
return;
}
return obj;
},
{},
);
} else if (ENFORCE_ON_ALL_CONSUMPTIONS_APIS === "true") {
res.writeHead(401, { "Content-Type": "application/json" });
res.end(JSON.stringify({ error: "Unauthorized" }));
return;
}

const userFuncModule = require(pathName);
const pathName = createPath(fileName);

const result = await userFuncModule.default(paramsObject, {
client: new MooseClient(getClickhouseClient(clickhouseConfig)),
sql: sql,
});
const paramsObject = Array.from(url.searchParams.entries()).reduce(
(obj: { [key: string]: any }, [key, value]) => {
if (obj[key]) {
if (Array.isArray(obj[key])) {
obj[key].push(value);
} else {
obj[key] = [obj[key], value];
}
} else {
obj[key] = value;
}
return obj;
},
{},
);

let body: string;
const userFuncModule = require(pathName);

// TODO investigate why these prototypes are different
if (Object.getPrototypeOf(result).constructor.name === "ResultSet") {
body = JSON.stringify(await result.json());
} else {
body = JSON.stringify(result);
}
const result = await userFuncModule.default(paramsObject, {
client: new MooseClient(getClickhouseClient(clickhouseConfig)),
sql: sql,
jwt: jwtPayload,
});

res.writeHead(200, { "Content-Type": "application/json" });
res.end(body);
} catch (error: any) {
if (error instanceof Error) {
res.writeHead(500, { "Content-Type": "application/json" });
res.end(JSON.stringify({ error: error.message }));
} else {
res.writeHead(500, { "Content-Type": "application/json" });
res;
let body: string;
let status: number | undefined;

// TODO investigate why these prototypes are different
if (Object.getPrototypeOf(result).constructor.name === "ResultSet") {
body = JSON.stringify(await result.json());
} else {
if ("body" in result && "status" in result) {
body = JSON.stringify(result.body);
status = result.status;
} else {
body = JSON.stringify(result);
}
}

if (status) {
res.writeHead(status, { "Content-Type": "application/json" });
} else {
res.writeHead(200, { "Content-Type": "application/json" });
}

res.end(body);
} catch (error: any) {
if (error instanceof Error) {
res.writeHead(500, { "Content-Type": "application/json" });
res.end(JSON.stringify({ error: error.message }));
} else {
res.writeHead(500, { "Content-Type": "application/json" });
res;
}
}
}
};
};

export const runConsumptionApis = async () => {
console.log("Starting API service");
const server = http.createServer(apiHandler);

let publicKey;
if (JWT_SECRET) {
console.log("Importing JWT public key...");
publicKey = await jose.importSPKI(JWT_SECRET, "RS256");
}

const server = http.createServer(apiHandler(publicKey));

process.on("SIGTERM", async () => {
console.log("Received SIGTERM, shutting down...");
Expand Down
2 changes: 2 additions & 0 deletions packages/ts-moose-lib/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { JWTPayload } from "jose";
import { MooseClient, sql } from "./consumption-apis/helpers";
export interface Aggregation {
select: string;
Expand All @@ -11,6 +12,7 @@ export interface ConsumptionUtil {

// SQL interpolator
sql: typeof sql;
jwt: JWTPayload | undefined;
}

export enum IngestionFormat {
Expand Down
Loading

0 comments on commit 0af2a9a

Please sign in to comment.