Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Operation dispatcher FSM #77

Merged
merged 16 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 97 additions & 20 deletions src/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,16 +486,7 @@ impl TryFrom<PluginConfiguration> for FilterConfig {
let services = config
.extensions
.into_iter()
.map(|(name, ext)| {
(
name,
Rc::new(GrpcService::new(
ext.extension_type,
ext.endpoint,
ext.failure_mode,
)),
)
})
.map(|(name, ext)| (name, Rc::new(GrpcService::new(Rc::new(ext)))))
.collect();

Ok(Self {
Expand All @@ -505,18 +496,19 @@ impl TryFrom<PluginConfiguration> for FilterConfig {
}
}

#[derive(Deserialize, Debug, Clone, Default)]
#[derive(Deserialize, Debug, Clone, Default, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum FailureMode {
#[default]
Deny,
Allow,
}

#[derive(Deserialize, Debug, Clone)]
#[derive(Deserialize, Debug, Clone, Default, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum ExtensionType {
Auth,
#[default]
RateLimit,
}

Expand All @@ -527,7 +519,7 @@ pub struct PluginConfiguration {
pub policies: Vec<Policy>,
}

#[derive(Deserialize, Debug, Clone)]
#[derive(Deserialize, Debug, Clone, Default)]
#[serde(rename_all = "camelCase")]
pub struct Extension {
#[serde(rename = "type")]
Expand All @@ -537,6 +529,14 @@ pub struct Extension {
pub failure_mode: FailureMode,
}

#[derive(Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Action {
pub extension: String,
#[allow(dead_code)]
pub data: DataType,
}

#[cfg(test)]
mod test {
use super::*;
Expand Down Expand Up @@ -587,7 +587,18 @@ mod test {
"selector": "auth.metadata.username"
}
}]
}]
}],
"actions": [
{
"extension": "limitador",
"data": {
"static": {
"key": "rlp-ns-A/rlp-name-A",
"value": "1"
}
}
}
]
}]
}"#;

Expand Down Expand Up @@ -682,7 +693,18 @@ mod test {
"default": "my_selector_default_value"
}
}]
}]
}],
"actions": [
{
"extension": "limitador",
"data": {
"static": {
"key": "rlp-ns-A/rlp-name-A",
"value": "1"
}
}
}
]
}]
}"#;
let res = serde_json::from_str::<PluginConfiguration>(config);
Expand Down Expand Up @@ -759,7 +781,18 @@ mod test {
}]
}],
"data": [ { "selector": { "selector": "my.selector.path" } }]
}]
}],
"actions": [
{
"extension": "limitador",
"data": {
"static": {
"key": "rlp-ns-A/rlp-name-A",
"value": "1"
}
}
}
]
}]
}"#;
let res = serde_json::from_str::<PluginConfiguration>(config);
Expand Down Expand Up @@ -825,7 +858,18 @@ mod test {
"selector": "auth.metadata.username"
}
}]
}]
}],
"actions": [
{
"extension": "limitador",
"data": {
"static": {
"key": "rlp-ns-A/rlp-name-A",
"value": "1"
}
}
}
]
}]
}"#;
let res = serde_json::from_str::<PluginConfiguration>(config);
Expand Down Expand Up @@ -872,7 +916,18 @@ mod test {
"selector": "auth.metadata.username"
}
}]
}]
}],
"actions": [
{
"extension": "limitador",
"data": {
"static": {
"key": "rlp-ns-A/rlp-name-A",
"value": "1"
}
}
}
]
}]
}"#;
let res = serde_json::from_str::<PluginConfiguration>(bad_config);
Expand Down Expand Up @@ -902,7 +957,18 @@ mod test {
"value": "1"
}
}]
}]
}],
"actions": [
{
"extension": "limitador",
"data": {
"static": {
"key": "rlp-ns-A/rlp-name-A",
"value": "1"
}
}
}
]
}]
}"#;
let res = serde_json::from_str::<PluginConfiguration>(bad_config);
Expand Down Expand Up @@ -934,7 +1000,18 @@ mod test {
}]
}],
"data": [ { "selector": { "selector": "my.selector.path" } }]
}]
}],
"actions": [
{
"extension": "limitador",
"data": {
"static": {
"key": "rlp-ns-A/rlp-name-A",
"value": "1"
}
}
}
]
}]
}"#;
let res = serde_json::from_str::<PluginConfiguration>(bad_config);
Expand Down
47 changes: 20 additions & 27 deletions src/filter/http_context.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use crate::configuration::{FailureMode, FilterConfig};
use crate::envoy::{RateLimitResponse, RateLimitResponse_Code};
use crate::operation_dispatcher::OperationDispatcher;
use crate::policy::Policy;
use crate::service::rate_limit::RateLimitService;
use crate::service::{GrpcServiceHandler, HeaderResolver};
use log::{debug, warn};
use protobuf::Message;
use proxy_wasm::traits::{Context, HttpContext};
Expand All @@ -13,7 +12,7 @@ pub struct Filter {
pub context_id: u32,
pub config: Rc<FilterConfig>,
pub response_headers_to_add: Vec<(String, String)>,
pub header_resolver: Rc<HeaderResolver>,
pub operation_dispatcher: OperationDispatcher,
}

impl Filter {
Expand All @@ -40,33 +39,27 @@ impl Filter {
return Action::Continue;
}

// todo(adam-cattermole): For now we just get the first GrpcService but we expect to have
// an action which links to the service that should be used
let rls = self
.config
.services
.values()
.next()
.expect("expect a value");

let handler = GrpcServiceHandler::new(Rc::clone(rls), Rc::clone(&self.header_resolver));
let message = RateLimitService::message(rlp.domain.clone(), descriptors);
self.operation_dispatcher.build_operations(rlp, descriptors);

match handler.send(message) {
Ok(call_id) => {
debug!(
"#{} initiated gRPC call (id# {}) to Limitador",
self.context_id, call_id
);
Action::Pause
}
Err(e) => {
warn!("gRPC call to Limitador failed! {e:?}");
if let FailureMode::Deny = rls.failure_mode() {
self.send_http_response(500, vec![], Some(b"Internal Server Error.\n"))
if let Some(operation) = self.operation_dispatcher.next() {
match operation.get_result() {
Ok(call_id) => {
debug!(
"#{} initiated gRPC call (id# {}) to Limitador",
self.context_id, call_id
);
Action::Pause
}
Err(e) => {
warn!("gRPC call to Limitador failed! {e:?}");
if let FailureMode::Deny = operation.get_failure_mode() {
self.send_http_response(500, vec![], Some(b"Internal Server Error.\n"))
}
Action::Continue
}
Action::Continue
}
} else {
Action::Continue
}
}

Expand Down
18 changes: 16 additions & 2 deletions src/filter/root_context.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use crate::configuration::{FilterConfig, PluginConfiguration};
use crate::filter::http_context::Filter;
use crate::service::HeaderResolver;
use crate::operation_dispatcher::OperationDispatcher;
use crate::service::{GrpcServiceHandler, HeaderResolver};
use const_format::formatcp;
use log::{debug, error, info};
use proxy_wasm::traits::{Context, HttpContext, RootContext};
use proxy_wasm::types::ContextType;
use std::collections::HashMap;
use std::rc::Rc;

const WASM_SHIM_VERSION: &str = env!("CARGO_PKG_VERSION");
Expand Down Expand Up @@ -37,11 +39,23 @@ impl RootContext for FilterRoot {

fn create_http_context(&self, context_id: u32) -> Option<Box<dyn HttpContext>> {
debug!("#{} create_http_context", context_id);
let mut service_handlers: HashMap<String, Rc<GrpcServiceHandler>> = HashMap::new();
self.config
.services
.iter()
.for_each(|(extension, service)| {
service_handlers
.entry(extension.clone())
.or_insert(Rc::from(GrpcServiceHandler::new(
Rc::clone(service),
Rc::new(HeaderResolver::new()),
)));
});
Some(Box::new(Filter {
context_id,
config: Rc::clone(&self.config),
response_headers_to_add: Vec::default(),
header_resolver: Rc::new(HeaderResolver::new()),
operation_dispatcher: OperationDispatcher::new(service_handlers),
}))
}

Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mod configuration;
mod envoy;
mod filter;
mod glob;
mod operation_dispatcher;
mod policy;
mod policy_index;
mod service;
Expand Down
Loading
Loading