Skip to content

Commit

Permalink
Merge pull request #66 from Kuadrant/refactoring-2
Browse files Browse the repository at this point in the history
Refactoring for external auth. Vol II
  • Loading branch information
didierofrivia authored Aug 20, 2024
2 parents c19464b + 3011881 commit cebb46d
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 27 deletions.
36 changes: 9 additions & 27 deletions src/filter/http_context.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
use crate::configuration::{FailureMode, FilterConfig};
use crate::envoy::{RateLimitRequest, RateLimitResponse, RateLimitResponse_Code};
use crate::envoy::{RateLimitResponse, RateLimitResponse_Code};
use crate::filter::http_context::TracingHeader::{Baggage, Traceparent, Tracestate};
use crate::policy::Policy;
use crate::service::rate_limit::RateLimitService;
use crate::service::Service;
use log::{debug, warn};
use protobuf::Message;
use proxy_wasm::traits::{Context, HttpContext};
use proxy_wasm::types::{Action, Bytes};
use std::rc::Rc;
use std::time::Duration;

const RATELIMIT_SERVICE_NAME: &str = "envoy.service.ratelimit.v3.RateLimitService";
const RATELIMIT_METHOD_NAME: &str = "ShouldRateLimit";

// tracing headers
#[derive(Clone)]
pub enum TracingHeader {
Traceparent,
Tracestate,
Expand All @@ -24,7 +23,7 @@ impl TracingHeader {
[Traceparent, Tracestate, Baggage]
}

fn as_str(&self) -> &'static str {
pub fn as_str(&self) -> &'static str {
match self {
Traceparent => "traceparent",
Tracestate => "tracestate",
Expand Down Expand Up @@ -64,27 +63,10 @@ impl Filter {
return Action::Continue;
}

let mut rl_req = RateLimitRequest::new();
rl_req.set_domain(rlp.domain.clone());
rl_req.set_hits_addend(1);
rl_req.set_descriptors(descriptors);

let rl_req_serialized = Message::write_to_bytes(&rl_req).unwrap(); // TODO(rahulanand16nov): Error Handling

let rl_tracing_headers = self
.tracing_headers
.iter()
.map(|(header, value)| (header.as_str(), value.as_slice()))
.collect();

match self.dispatch_grpc_call(
rlp.service.as_str(),
RATELIMIT_SERVICE_NAME,
RATELIMIT_METHOD_NAME,
rl_tracing_headers,
Some(&rl_req_serialized),
Duration::from_secs(5),
) {
let rls = RateLimitService::new(rlp.service.as_str(), self.tracing_headers.clone());
let message = RateLimitService::message(rlp.domain.clone(), descriptors);

match rls.send(message) {
Ok(call_id) => {
debug!(
"#{} initiated gRPC call (id# {}) to Limitador",
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ mod filter;
mod glob;
mod policy;
mod policy_index;
mod service;

#[cfg(test)]
mod tests {
Expand Down
8 changes: 8 additions & 0 deletions src/service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
pub(crate) mod rate_limit;

use protobuf::Message;
use proxy_wasm::types::Status;

pub trait Service<M: Message> {
fn send(&self, message: M) -> Result<u32, Status>;
}
113 changes: 113 additions & 0 deletions src/service/rate_limit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
use crate::envoy::{RateLimitDescriptor, RateLimitRequest};
use crate::filter::http_context::TracingHeader;
use crate::service::Service;
use protobuf::{Message, RepeatedField};
use proxy_wasm::hostcalls::dispatch_grpc_call;
use proxy_wasm::types::{Bytes, Status};
use std::time::Duration;

const RATELIMIT_SERVICE_NAME: &str = "envoy.service.ratelimit.v3.RateLimitService";
const RATELIMIT_METHOD_NAME: &str = "ShouldRateLimit";
pub struct RateLimitService {
endpoint: String,
tracing_headers: Vec<(TracingHeader, Bytes)>,
}

impl RateLimitService {
pub fn new(endpoint: &str, metadata: Vec<(TracingHeader, Bytes)>) -> RateLimitService {
Self {
endpoint: String::from(endpoint),
tracing_headers: metadata,
}
}
pub fn message(
domain: String,
descriptors: RepeatedField<RateLimitDescriptor>,
) -> RateLimitRequest {
RateLimitRequest {
domain,
descriptors,
hits_addend: 1,
unknown_fields: Default::default(),
cached_size: Default::default(),
}
}
}

fn grpc_call(
upstream_name: &str,
initial_metadata: Vec<(&str, &[u8])>,
message: RateLimitRequest,
) -> Result<u32, Status> {
let msg = Message::write_to_bytes(&message).unwrap(); // TODO(didierofrivia): Error Handling
dispatch_grpc_call(
upstream_name,
RATELIMIT_SERVICE_NAME,
RATELIMIT_METHOD_NAME,
initial_metadata,
Some(&msg),
Duration::from_secs(5),
)
}

impl Service<RateLimitRequest> for RateLimitService {
fn send(&self, message: RateLimitRequest) -> Result<u32, Status> {
grpc_call(
self.endpoint.as_str(),
self.tracing_headers
.iter()
.map(|(header, value)| (header.as_str(), value.as_slice()))
.collect(),
message,
)
}
}

#[cfg(test)]
mod tests {
use crate::envoy::{RateLimitDescriptor, RateLimitDescriptor_Entry, RateLimitRequest};
use crate::service::rate_limit::RateLimitService;
//use crate::service::Service;
use protobuf::{CachedSize, RepeatedField, UnknownFields};
//use proxy_wasm::types::Status;
//use crate::filter::http_context::{Filter};

fn build_message() -> RateLimitRequest {
let domain = "rlp1";
let mut field = RateLimitDescriptor::new();
let mut entry = RateLimitDescriptor_Entry::new();
entry.set_key("key1".to_string());
entry.set_value("value1".to_string());
field.set_entries(RepeatedField::from_vec(vec![entry]));
let descriptors = RepeatedField::from_vec(vec![field]);

RateLimitService::message(domain.to_string(), descriptors.clone())
}
#[test]
fn builds_correct_message() {
let msg = build_message();

assert_eq!(msg.hits_addend, 1);
assert_eq!(msg.domain, "rlp1".to_string());
assert_eq!(msg.descriptors.first().unwrap().entries[0].key, "key1");
assert_eq!(msg.descriptors.first().unwrap().entries[0].value, "value1");
assert_eq!(msg.unknown_fields, UnknownFields::default());
assert_eq!(msg.cached_size, CachedSize::default());
}
/*#[test]
fn sends_message() {
let msg = build_message();
let metadata = vec![("header-1", "value-1".as_bytes())];
let rls = RateLimitService::new("limitador-cluster", metadata);
// TODO(didierofrivia): When we have a grpc response type, assert the async response
}
fn grpc_call(
_upstream_name: &str,
_initial_metadata: Vec<(&str, &[u8])>,
_message: RateLimitRequest,
) -> Result<u32, Status> {
Ok(1)
} */
}

0 comments on commit cebb46d

Please sign in to comment.