Skip to content

Commit

Permalink
ehanchement(internal_metrics): reuse BytesReceived event for intern…
Browse files Browse the repository at this point in the history
…al metrics (#20977)

Instead of having a completely separate struct for internal metrics bytes received event, we can
reuse the common `BytesReceived` event, defining a new `Protocol` for it. Similar is done for static
metrics.

Related: #20899
  • Loading branch information
esensar authored Jul 31, 2024
1 parent 789848f commit 70e61fc
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 29 deletions.
1 change: 1 addition & 0 deletions lib/vector-common/src/internal_event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ impl Protocol {
pub const TCP: Protocol = Protocol(SharedString::const_str("tcp"));
pub const UDP: Protocol = Protocol(SharedString::const_str("udp"));
pub const UNIX: Protocol = Protocol(SharedString::const_str("unix"));
pub const INTERNAL: Protocol = Protocol(SharedString::const_str("internal"));
pub const STATIC: Protocol = Protocol(SharedString::const_str("static"));
}

Expand Down
22 changes: 0 additions & 22 deletions src/internal_events/internal_metrics.rs

This file was deleted.

4 changes: 0 additions & 4 deletions src/internal_events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,6 @@ mod http_client_source;
mod influxdb;
#[cfg(feature = "sources-internal_logs")]
mod internal_logs;
#[cfg(feature = "sources-internal_metrics")]
mod internal_metrics;
#[cfg(all(unix, feature = "sources-journald"))]
mod journald;
#[cfg(any(feature = "sources-kafka", feature = "sinks-kafka"))]
Expand Down Expand Up @@ -209,8 +207,6 @@ pub(crate) use self::http_client_source::*;
pub(crate) use self::influxdb::*;
#[cfg(feature = "sources-internal_logs")]
pub(crate) use self::internal_logs::*;
#[cfg(feature = "sources-internal_metrics")]
pub(crate) use self::internal_metrics::*;
#[cfg(all(unix, feature = "sources-journald"))]
pub(crate) use self::journald::*;
#[cfg(any(feature = "sources-kafka", feature = "sinks-kafka"))]
Expand Down
9 changes: 6 additions & 3 deletions src/sources/internal_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ use serde_with::serde_as;
use tokio::time;
use tokio_stream::wrappers::IntervalStream;
use vector_lib::configurable::configurable_component;
use vector_lib::internal_event::{CountByteSize, InternalEventHandle as _};
use vector_lib::internal_event::{
ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol,
};
use vector_lib::lookup::lookup_v2::OptionalValuePath;
use vector_lib::{config::LogNamespace, ByteSizeOf, EstimatedJsonEncodedSizeOf};

use crate::{
config::{log_schema, SourceConfig, SourceContext, SourceOutput},
internal_events::{EventsReceived, InternalMetricsBytesReceived, StreamClosedError},
internal_events::{EventsReceived, StreamClosedError},
metrics::Controller,
shutdown::ShutdownSignal,
SourceSender,
Expand Down Expand Up @@ -156,6 +158,7 @@ struct InternalMetrics<'a> {
impl<'a> InternalMetrics<'a> {
async fn run(mut self) -> Result<(), ()> {
let events_received = register!(EventsReceived);
let bytes_received = register!(BytesReceived::from(Protocol::INTERNAL));
let mut interval =
IntervalStream::new(time::interval(self.interval)).take_until(self.shutdown);
while interval.next().await.is_some() {
Expand All @@ -167,7 +170,7 @@ impl<'a> InternalMetrics<'a> {
let byte_size = metrics.size_of();
let json_size = metrics.estimated_json_encoded_size_of();

emit!(InternalMetricsBytesReceived { byte_size });
bytes_received.emit(ByteSize(byte_size));
events_received.emit(CountByteSize(count, json_size));

let batch = metrics.into_iter().map(|mut metric| {
Expand Down

0 comments on commit 70e61fc

Please sign in to comment.