Skip to content

Commit

Permalink
fix(config): allow usage of metrics-only decoders in log sources (vec…
Browse files Browse the repository at this point in the history
…tordotdev#21040)

* fix: allow usage of metrics-only decoder in log sources

* docs: Add changelog

* refactor(config): Refactor schema definition construction

* Update 21039_allow-usage-of-metrics-only-decoders.fix.md

* test(config): Add tests for SourceOutput::new_maybe_log

* test(config): Add tests for SourceOutput::new_maybe_log
  • Loading branch information
jorgehermo9 authored and AndrooTheChen committed Sep 23, 2024
1 parent eaebc9c commit 2224216
Show file tree
Hide file tree
Showing 39 changed files with 107 additions and 59 deletions.
3 changes: 3 additions & 0 deletions changelog.d/21039_allow-usage-of-metrics-only-decoders.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Log sources can now use metrics-only decoders such as the recently added `influxdb` decoder.

authors: jorgehermo9
32 changes: 22 additions & 10 deletions lib/vector-core/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,20 +117,18 @@ pub struct SourceOutput {

impl SourceOutput {
/// Create a `SourceOutput` of the given data type that contains a single output `Definition`.
/// If the data type does not contain logs, the schema definition will be ignored.
/// Designed for use in log sources.
///
///
/// # Panics
///
/// Panics if `ty` does not contain [`DataType::Log`].
#[must_use]
pub fn new_logs(ty: DataType, schema_definition: schema::Definition) -> Self {
assert!(ty.contains(DataType::Log));
pub fn new_maybe_logs(ty: DataType, schema_definition: schema::Definition) -> Self {
let schema_definition = ty
.contains(DataType::Log)
.then(|| Arc::new(schema_definition));

Self {
port: None,
ty,
schema_definition: Some(Arc::new(schema_definition)),
schema_definition,
}
}

Expand Down Expand Up @@ -573,7 +571,7 @@ mod test {
let definition = schema::Definition::empty_legacy_namespace()
.with_event_field(&owned_value_path!("zork"), Kind::bytes(), Some("zork"))
.with_event_field(&owned_value_path!("nork"), Kind::integer(), None);
let output = SourceOutput::new_logs(DataType::Log, definition);
let output = SourceOutput::new_maybe_logs(DataType::Log, definition);

let valid_event = LogEvent::from(Value::from(btreemap! {
"zork" => "norknoog",
Expand Down Expand Up @@ -619,7 +617,7 @@ mod test {
)
.with_event_field(&owned_value_path!("nork"), Kind::integer(), None);

let output = SourceOutput::new_logs(DataType::Log, definition);
let output = SourceOutput::new_maybe_logs(DataType::Log, definition);

let mut valid_event = LogEvent::from(Value::from(btreemap! {
"nork" => 32
Expand Down Expand Up @@ -673,4 +671,18 @@ mod test {
new_definition.assert_valid_for_event(&valid_event);
new_definition.assert_valid_for_event(&invalid_event);
}

#[test]
fn test_new_log_source_ignores_definition_with_metric_data_type() {
let definition = schema::Definition::any();
let output = SourceOutput::new_maybe_logs(DataType::Metric, definition);
assert_eq!(output.schema_definition(true), None);
}

#[test]
fn test_new_log_source_uses_definition_with_log_data_type() {
let definition = schema::Definition::any();
let output = SourceOutput::new_maybe_logs(DataType::Log, definition.clone());
assert_eq!(output.schema_definition(true), Some(definition));
}
}
8 changes: 4 additions & 4 deletions src/config/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ mod test {
outputs: vec![match ty {
DataType::Metric => SourceOutput::new_metrics(),
DataType::Trace => SourceOutput::new_traces(),
_ => SourceOutput::new_logs(ty, Definition::any()),
_ => SourceOutput::new_maybe_logs(ty, Definition::any()),
}],
},
);
Expand Down Expand Up @@ -639,7 +639,7 @@ mod test {
graph.nodes.insert(
ComponentKey::from("foo.bar"),
Node::Source {
outputs: vec![SourceOutput::new_logs(
outputs: vec![SourceOutput::new_maybe_logs(
DataType::all_bits(),
Definition::any(),
)],
Expand All @@ -648,7 +648,7 @@ mod test {
graph.nodes.insert(
ComponentKey::from("foo.bar"),
Node::Source {
outputs: vec![SourceOutput::new_logs(
outputs: vec![SourceOutput::new_maybe_logs(
DataType::all_bits(),
Definition::any(),
)],
Expand Down Expand Up @@ -676,7 +676,7 @@ mod test {
graph.nodes.insert(
ComponentKey::from("baz.errors"),
Node::Source {
outputs: vec![SourceOutput::new_logs(
outputs: vec![SourceOutput::new_maybe_logs(
DataType::all_bits(),
Definition::any(),
)],
Expand Down
4 changes: 2 additions & 2 deletions src/config/unit_test/unit_test_components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl SourceConfig for UnitTestSourceConfig {
}

fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
vec![SourceOutput::new_logs(
vec![SourceOutput::new_maybe_logs(
DataType::all_bits(),
schema::Definition::default_legacy_namespace(),
)]
Expand Down Expand Up @@ -103,7 +103,7 @@ impl SourceConfig for UnitTestStreamSourceConfig {
}

fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
vec![SourceOutput::new_logs(
vec![SourceOutput::new_maybe_logs(
DataType::all_bits(),
schema::Definition::default_legacy_namespace(),
)]
Expand Down
2 changes: 1 addition & 1 deletion src/sources/amqp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ impl SourceConfig for AmqpSourceConfig {
None,
);

vec![SourceOutput::new_logs(
vec![SourceOutput::new_maybe_logs(
self.decoding.output_type(),
schema_definition,
)]
Expand Down
2 changes: 1 addition & 1 deletion src/sources/aws_kinesis_firehose/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ impl SourceConfig for AwsKinesisFirehoseConfig {
None,
);

vec![SourceOutput::new_logs(
vec![SourceOutput::new_maybe_logs(
self.decoding.output_type(),
schema_definition,
)]
Expand Down
2 changes: 1 addition & 1 deletion src/sources/aws_s3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ impl SourceConfig for AwsS3Config {
schema_definition = schema_definition.unknown_fields(Kind::bytes());
}

vec![SourceOutput::new_logs(
vec![SourceOutput::new_maybe_logs(
self.decoding.output_type(),
schema_definition,
)]
Expand Down
2 changes: 1 addition & 1 deletion src/sources/aws_sqs/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ impl SourceConfig for AwsSqsConfig {
Some("timestamp"),
);

vec![SourceOutput::new_logs(
vec![SourceOutput::new_maybe_logs(
self.decoding.output_type(),
schema_definition,
)]
Expand Down
7 changes: 5 additions & 2 deletions src/sources/datadog_agent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ impl SourceConfig for DatadogAgentConfig {

if self.multiple_outputs {
if !self.disable_logs {
output.push(SourceOutput::new_logs(DataType::Log, definition).with_port(LOGS))
output.push(SourceOutput::new_maybe_logs(DataType::Log, definition).with_port(LOGS))
}
if !self.disable_metrics {
output.push(SourceOutput::new_metrics().with_port(METRICS))
Expand All @@ -303,7 +303,10 @@ impl SourceConfig for DatadogAgentConfig {
output.push(SourceOutput::new_traces().with_port(TRACES))
}
} else {
output.push(SourceOutput::new_logs(DataType::all_bits(), definition))
output.push(SourceOutput::new_maybe_logs(
DataType::all_bits(),
definition,
))
}
output
}
Expand Down
2 changes: 1 addition & 1 deletion src/sources/demo_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ impl SourceConfig for DemoLogsConfig {
Some("service"),
);

vec![SourceOutput::new_logs(
vec![SourceOutput::new_maybe_logs(
self.decoding.output_type(),
schema_definition,
)]
Expand Down
5 changes: 4 additions & 1 deletion src/sources/dnstap/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,10 @@ impl SourceConfig for DnstapConfig {
let schema_definition = self
.schema_definition(log_namespace)
.with_standard_vector_source_metadata();
vec![SourceOutput::new_logs(DataType::Log, schema_definition)]
vec![SourceOutput::new_maybe_logs(
DataType::Log,
schema_definition,
)]
}

fn can_acknowledge(&self) -> bool {
Expand Down
5 changes: 4 additions & 1 deletion src/sources/docker_logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,10 @@ impl SourceConfig for DockerLogsConfig {
None,
);

vec![SourceOutput::new_logs(DataType::Log, schema_definition)]
vec![SourceOutput::new_maybe_logs(
DataType::Log,
schema_definition,
)]
}

fn can_acknowledge(&self) -> bool {
Expand Down
2 changes: 1 addition & 1 deletion src/sources/exec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ impl SourceConfig for ExecConfig {
None,
);

vec![SourceOutput::new_logs(
vec![SourceOutput::new_maybe_logs(
self.decoding.output_type(),
schema_definition,
)]
Expand Down
5 changes: 4 additions & 1 deletion src/sources/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,10 @@ impl SourceConfig for FileConfig {
None,
);

vec![SourceOutput::new_logs(DataType::Log, schema_definition)]
vec![SourceOutput::new_maybe_logs(
DataType::Log,
schema_definition,
)]
}

fn can_acknowledge(&self) -> bool {
Expand Down
2 changes: 1 addition & 1 deletion src/sources/file_descriptors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ fn outputs(
)
.with_standard_vector_source_metadata();

vec![SourceOutput::new_logs(
vec![SourceOutput::new_maybe_logs(
decoding.output_type(),
schema_definition,
)]
Expand Down
5 changes: 4 additions & 1 deletion src/sources/fluent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,10 @@ impl SourceConfig for FluentConfig {
let log_namespace = global_log_namespace.merge(self.log_namespace);
let schema_definition = self.schema_definition(log_namespace);

vec![SourceOutput::new_logs(DataType::Log, schema_definition)]
vec![SourceOutput::new_maybe_logs(
DataType::Log,
schema_definition,
)]
}

fn resources(&self) -> Vec<Resource> {
Expand Down
5 changes: 4 additions & 1 deletion src/sources/gcp_pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,10 @@ impl SourceConfig for PubsubConfig {
None,
);

vec![SourceOutput::new_logs(DataType::Log, schema_definition)]
vec![SourceOutput::new_maybe_logs(
DataType::Log,
schema_definition,
)]
}

fn can_acknowledge(&self) -> bool {
Expand Down
2 changes: 1 addition & 1 deletion src/sources/heroku_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ impl SourceConfig for LogplexConfig {
// There is a global and per-source `log_namespace` config.
// The source config overrides the global setting and is merged here.
let schema_def = self.schema_definition(global_log_namespace.merge(self.log_namespace));
vec![SourceOutput::new_logs(
vec![SourceOutput::new_maybe_logs(
self.decoding.output_type(),
schema_def,
)]
Expand Down
2 changes: 1 addition & 1 deletion src/sources/http_client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ impl SourceConfig for HttpClientConfig {
.schema_definition(log_namespace)
.with_standard_vector_source_metadata();

vec![SourceOutput::new_logs(
vec![SourceOutput::new_maybe_logs(
self.decoding.output_type(),
schema_definition,
)]
Expand Down
2 changes: 1 addition & 1 deletion src/sources/http_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ impl SourceConfig for SimpleHttpConfig {

let schema_definition = self.schema_definition(log_namespace);

vec![SourceOutput::new_logs(
vec![SourceOutput::new_maybe_logs(
self.decoding
.as_ref()
.map(|d| d.output_type())
Expand Down
5 changes: 4 additions & 1 deletion src/sources/internal_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,10 @@ impl SourceConfig for InternalLogsConfig {
let schema_definition =
self.schema_definition(global_log_namespace.merge(self.log_namespace));

vec![SourceOutput::new_logs(DataType::Log, schema_definition)]
vec![SourceOutput::new_maybe_logs(
DataType::Log,
schema_definition,
)]
}

fn can_acknowledge(&self) -> bool {
Expand Down
5 changes: 4 additions & 1 deletion src/sources/journald.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,10 @@ impl SourceConfig for JournaldConfig {
let schema_definition =
self.schema_definition(global_log_namespace.merge(self.log_namespace));

vec![SourceOutput::new_logs(DataType::Log, schema_definition)]
vec![SourceOutput::new_maybe_logs(
DataType::Log,
schema_definition,
)]
}

fn can_acknowledge(&self) -> bool {
Expand Down
2 changes: 1 addition & 1 deletion src/sources/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ impl SourceConfig for KafkaSourceConfig {
None,
);

vec![SourceOutput::new_logs(
vec![SourceOutput::new_maybe_logs(
self.decoding.output_type(),
schema_definition,
)]
Expand Down
5 changes: 4 additions & 1 deletion src/sources/kubernetes_logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,10 @@ impl SourceConfig for Config {
)
.with_standard_vector_source_metadata();

vec![SourceOutput::new_logs(DataType::Log, schema_definition)]
vec![SourceOutput::new_maybe_logs(
DataType::Log,
schema_definition,
)]
}

fn can_acknowledge(&self) -> bool {
Expand Down
2 changes: 1 addition & 1 deletion src/sources/logstash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ impl SourceConfig for LogstashConfig {
fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
// There is a global and per-source `log_namespace` config.
// The source config overrides the global setting and is merged here.
vec![SourceOutput::new_logs(
vec![SourceOutput::new_maybe_logs(
DataType::Log,
self.schema_definition(global_log_namespace.merge(self.log_namespace)),
)]
Expand Down
2 changes: 1 addition & 1 deletion src/sources/nats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ impl SourceConfig for NatsSourceConfig {
None,
);

vec![SourceOutput::new_logs(
vec![SourceOutput::new_maybe_logs(
self.decoding.output_type(),
schema_definition,
)]
Expand Down
2 changes: 1 addition & 1 deletion src/sources/opentelemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ impl SourceConfig for OpentelemetryConfig {
};

vec![
SourceOutput::new_logs(DataType::Log, schema_definition).with_port(LOGS),
SourceOutput::new_maybe_logs(DataType::Log, schema_definition).with_port(LOGS),
SourceOutput::new_traces().with_port(TRACES),
]
}
Expand Down
2 changes: 1 addition & 1 deletion src/sources/pulsar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ impl SourceConfig for PulsarSourceConfig {
Kind::bytes(),
Some("producer_name"),
);
vec![SourceOutput::new_logs(
vec![SourceOutput::new_maybe_logs(
self.decoding.output_type(),
schema_definition,
)]
Expand Down
2 changes: 1 addition & 1 deletion src/sources/redis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ impl SourceConfig for RedisSourceConfig {
)
.with_standard_vector_source_metadata();

vec![SourceOutput::new_logs(
vec![SourceOutput::new_maybe_logs(
self.decoding.output_type(),
schema_definition,
)]
Expand Down
2 changes: 1 addition & 1 deletion src/sources/socket/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ impl SourceConfig for SocketConfig {
}
};

vec![SourceOutput::new_logs(
vec![SourceOutput::new_maybe_logs(
self.decoding().output_type(),
schema_definition,
)]
Expand Down
Loading

0 comments on commit 2224216

Please sign in to comment.