diff --git a/lib/vector-common/src/internal_event/cached_event.rs b/lib/vector-common/src/internal_event/cached_event.rs index e672848c93584..daa3f8ea803af 100644 --- a/lib/vector-common/src/internal_event/cached_event.rs +++ b/lib/vector-common/src/internal_event/cached_event.rs @@ -17,8 +17,9 @@ use super::{InternalEventHandle, RegisterInternalEvent}; /// new event is emitted for a previously unseen set of tags an event is registered /// and stored in the cache. #[derive(Derivative)] -#[derivative(Clone(bound = ""), Default(bound = ""))] -pub struct RegisteredEventCache { +#[derivative(Clone(bound = "T: Clone"))] +pub struct RegisteredEventCache { + fixed_tags: T, cache: Arc< RwLock< BTreeMap< @@ -36,16 +37,31 @@ pub trait RegisterTaggedInternalEvent: RegisterInternalEvent { /// that will be used when registering the event. type Tags; - fn register(tags: Self::Tags) -> ::Handle; + /// The type that contains data necessary to extract the tags that will + /// be fixed and only need setting up front when the cache is first created. + type Fixed; + + fn register(fixed: Self::Fixed, tags: Self::Tags) -> ::Handle; } -impl RegisteredEventCache +impl RegisteredEventCache where Data: Sized, EventHandle: InternalEventHandle, Tags: Ord + Clone, - Event: RegisterInternalEvent + RegisterTaggedInternalEvent, + FixedTags: Clone, + Event: RegisterInternalEvent + + RegisterTaggedInternalEvent, { + /// Create a new event cache with a set of fixed tags. These tags are passed to + /// all registered events. + pub fn new(fixed_tags: FixedTags) -> Self { + Self { + fixed_tags, + cache: Arc::default(), + } + } + /// Emits the event with the given tags. /// It will register the event and store in the cache if this has not already /// been done. @@ -58,7 +74,10 @@ where if let Some(event) = read.get(tags) { event.emit(value); } else { - let event = ::register(tags.clone()); + let event = ::register( + self.fixed_tags.clone(), + tags.clone(), + ); event.emit(value); // Ensure the read lock is dropped so we can write. @@ -67,3 +86,43 @@ where } } } + +#[cfg(test)] +mod tests { + #![allow(unreachable_pub)] + use metrics::{register_counter, Counter}; + + use super::*; + + crate::registered_event!( + TestEvent { + fixed: String, + dynamic: String, + } => { + event: Counter = { + register_counter!("test_event_total", "fixed" => self.fixed, "dynamic" => self.dynamic) + }, + } + + fn emit(&self, count: u64) { + self.event.increment(count); + } + + fn register(fixed: String, dynamic: String) { + crate::internal_event::register(TestEvent { + fixed, + dynamic, + }) + } + ); + + #[test] + fn test_fixed_tag() { + let event: RegisteredEventCache = + RegisteredEventCache::new("fixed".to_string()); + + for tag in 1..=5 { + event.emit(&format!("dynamic{tag}"), tag); + } + } +} diff --git a/lib/vector-common/src/internal_event/events_sent.rs b/lib/vector-common/src/internal_event/events_sent.rs index d12a22bf17e8a..22d614b0016eb 100644 --- a/lib/vector-common/src/internal_event/events_sent.rs +++ b/lib/vector-common/src/internal_event/events_sent.rs @@ -91,7 +91,7 @@ crate::registered_event!( self.event_bytes.increment(byte_size.get() as u64); } - fn register(tags: EventCountTags) { + fn register(_fixed: (), tags: EventCountTags) { super::register(TaggedEventsSent::new( tags, )) diff --git a/lib/vector-common/src/internal_event/mod.rs b/lib/vector-common/src/internal_event/mod.rs index 7d016e4a9e562..8785fa45757a8 100644 --- a/lib/vector-common/src/internal_event/mod.rs +++ b/lib/vector-common/src/internal_event/mod.rs @@ -219,7 +219,7 @@ macro_rules! registered_event { fn emit(&$slf:ident, $data_name:ident: $data:ident) $emit_body:block - $(fn register($tags_name:ident: $tags:ty) + $(fn register($fixed_name:ident: $fixed_tags:ty, $tags_name:ident: $tags:ty) $register_body:block)? ) => { paste::paste!{ @@ -251,8 +251,10 @@ macro_rules! registered_event { $(impl $crate::internal_event::cached_event::RegisterTaggedInternalEvent for $event { type Tags = $tags; + type Fixed = $fixed_tags; fn register( + $fixed_name: $fixed_tags, $tags_name: $tags, ) -> ::Handle { $register_body diff --git a/lib/vector-common/src/request_metadata.rs b/lib/vector-common/src/request_metadata.rs index 9b93a63df7626..881918c899f8c 100644 --- a/lib/vector-common/src/request_metadata.rs +++ b/lib/vector-common/src/request_metadata.rs @@ -129,9 +129,9 @@ impl GroupedCountByteSize { } /// Emits our counts to a `RegisteredEvent` cached event. - pub fn emit_event(&self, event_cache: &RegisteredEventCache) + pub fn emit_event(&self, event_cache: &RegisteredEventCache<(), T>) where - T: RegisterTaggedInternalEvent, + T: RegisterTaggedInternalEvent, H: InternalEventHandle, { match self { diff --git a/lib/vector-core/src/stream/driver.rs b/lib/vector-core/src/stream/driver.rs index 6ff23014c96d5..1a1ce79e1b27a 100644 --- a/lib/vector-core/src/stream/driver.rs +++ b/lib/vector-core/src/stream/driver.rs @@ -99,7 +99,7 @@ where pin!(batched_input); let bytes_sent = protocol.map(|protocol| register(BytesSent { protocol })); - let events_sent = RegisteredEventCache::default(); + let events_sent = RegisteredEventCache::new(()); loop { // Core behavior of the loop: @@ -203,7 +203,7 @@ where finalizers: EventFinalizers, event_count: usize, bytes_sent: &Option>, - events_sent: &RegisteredEventCache, + events_sent: &RegisteredEventCache<(), TaggedEventsSent>, ) { match result { Err(error) => {