Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stdout exporter for span and metrics #6750

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import io.opentelemetry.sdk.common.export.MemoryMode;
import io.opentelemetry.sdk.metrics.Aggregation;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.export.AggregationTemporalitySelector;
import io.opentelemetry.sdk.metrics.export.DefaultAggregationSelector;
import io.opentelemetry.sdk.metrics.internal.aggregator.AggregationUtil;
import java.net.URI;
Expand Down Expand Up @@ -96,5 +98,95 @@
}
}

/**
* Invoke the {@code aggregationTemporalitySelectorConsumer} with the configured {@link
* AggregationTemporality}.
*/
public static void configureOtlpAggregationTemporality(
ConfigProperties config,
Consumer<AggregationTemporalitySelector> aggregationTemporalitySelectorConsumer) {
String temporalityStr = config.getString("otel.exporter.otlp.metrics.temporality.preference");
jack-berg marked this conversation as resolved.
Show resolved Hide resolved
if (temporalityStr == null) {
return;
}
AggregationTemporalitySelector temporalitySelector;
switch (temporalityStr.toLowerCase(Locale.ROOT)) {
case "cumulative":
temporalitySelector = AggregationTemporalitySelector.alwaysCumulative();
break;
case "delta":
temporalitySelector = AggregationTemporalitySelector.deltaPreferred();
break;
case "lowmemory":
temporalitySelector = AggregationTemporalitySelector.lowMemory();
break;

Check warning on line 122 in exporters/common/src/main/java/io/opentelemetry/exporter/internal/ExporterBuilderUtil.java

View check run for this annotation

Codecov / codecov/patch

exporters/common/src/main/java/io/opentelemetry/exporter/internal/ExporterBuilderUtil.java#L121-L122

Added lines #L121 - L122 were not covered by tests
default:
throw new ConfigurationException("Unrecognized aggregation temporality: " + temporalityStr);
}
aggregationTemporalitySelectorConsumer.accept(temporalitySelector);
}

public static void configureOtlpAggregationTemporality(
StructuredConfigProperties config,
Consumer<AggregationTemporalitySelector> aggregationTemporalitySelectorConsumer) {
String temporalityStr = config.getString("temporality_preference");
if (temporalityStr == null) {
return;
}
AggregationTemporalitySelector temporalitySelector;
switch (temporalityStr.toLowerCase(Locale.ROOT)) {
case "cumulative":
temporalitySelector = AggregationTemporalitySelector.alwaysCumulative();
break;
case "delta":
temporalitySelector = AggregationTemporalitySelector.deltaPreferred();
break;
case "lowmemory":
temporalitySelector = AggregationTemporalitySelector.lowMemory();
break;

Check warning on line 146 in exporters/common/src/main/java/io/opentelemetry/exporter/internal/ExporterBuilderUtil.java

View check run for this annotation

Codecov / codecov/patch

exporters/common/src/main/java/io/opentelemetry/exporter/internal/ExporterBuilderUtil.java#L145-L146

Added lines #L145 - L146 were not covered by tests
default:
throw new ConfigurationException("Unrecognized temporality_preference: " + temporalityStr);

Check warning on line 148 in exporters/common/src/main/java/io/opentelemetry/exporter/internal/ExporterBuilderUtil.java

View check run for this annotation

Codecov / codecov/patch

exporters/common/src/main/java/io/opentelemetry/exporter/internal/ExporterBuilderUtil.java#L148

Added line #L148 was not covered by tests
}
aggregationTemporalitySelectorConsumer.accept(temporalitySelector);
}

/**
* Invoke the {@code defaultAggregationSelectorConsumer} with the configured {@link
* DefaultAggregationSelector}.
*/
public static void configureOtlpHistogramDefaultAggregation(
ConfigProperties config,
Consumer<DefaultAggregationSelector> defaultAggregationSelectorConsumer) {
String defaultHistogramAggregation =
config.getString("otel.exporter.otlp.metrics.default.histogram.aggregation");
if (defaultHistogramAggregation != null) {
configureHistogramDefaultAggregation(
defaultHistogramAggregation, defaultAggregationSelectorConsumer);
}
}

/**
* Invoke the {@code defaultAggregationSelectorConsumer} with the configured {@link
* DefaultAggregationSelector}.
*/
public static void configureOtlpHistogramDefaultAggregation(
StructuredConfigProperties config,
Consumer<DefaultAggregationSelector> defaultAggregationSelectorConsumer) {
String defaultHistogramAggregation = config.getString("default_histogram_aggregation");
if (defaultHistogramAggregation == null) {
return;
}
if (AggregationUtil.aggregationName(Aggregation.base2ExponentialBucketHistogram())
.equalsIgnoreCase(defaultHistogramAggregation)) {
defaultAggregationSelectorConsumer.accept(
DefaultAggregationSelector.getDefault()
.with(InstrumentType.HISTOGRAM, Aggregation.base2ExponentialBucketHistogram()));
} else if (!AggregationUtil.aggregationName(explicitBucketHistogram())
.equalsIgnoreCase(defaultHistogramAggregation)) {
throw new ConfigurationException(

Check warning on line 186 in exporters/common/src/main/java/io/opentelemetry/exporter/internal/ExporterBuilderUtil.java

View check run for this annotation

Codecov / codecov/patch

exporters/common/src/main/java/io/opentelemetry/exporter/internal/ExporterBuilderUtil.java#L186

Added line #L186 was not covered by tests
"Unrecognized default_histogram_aggregation: " + defaultHistogramAggregation);
}
}

private ExporterBuilderUtil() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,14 @@

package io.opentelemetry.exporter.logging.otlp;

import static io.opentelemetry.exporter.logging.otlp.internal.writer.JsonUtil.JSON_FACTORY;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.io.SegmentedStringWriter;
import io.opentelemetry.exporter.internal.otlp.metrics.ResourceMetricsMarshaler;
import io.opentelemetry.exporter.logging.otlp.internal.writer.JsonUtil;
import io.opentelemetry.exporter.logging.otlp.internal.metrics.OtlpStdoutMetricExporter;
import io.opentelemetry.exporter.logging.otlp.internal.metrics.OtlpStdoutMetricExporterBuilder;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
Expand All @@ -31,30 +24,49 @@ public final class OtlpJsonLoggingMetricExporter implements MetricExporter {
private static final Logger logger =
Logger.getLogger(OtlpJsonLoggingMetricExporter.class.getName());

private final AtomicBoolean isShutdown = new AtomicBoolean();

private final AggregationTemporality aggregationTemporality;

private final OtlpStdoutMetricExporter delegate;

/**
* Returns a new {@link OtlpJsonLoggingMetricExporter} with a aggregation temporality of {@link
* AggregationTemporality#CUMULATIVE}.
*/
public static MetricExporter create() {
return new OtlpJsonLoggingMetricExporter(AggregationTemporality.CUMULATIVE);
return create(AggregationTemporality.CUMULATIVE);
}

/**
* Returns a new {@link OtlpJsonLoggingMetricExporter} with the given {@code
* aggregationTemporality}.
*/
public static MetricExporter create(AggregationTemporality aggregationTemporality) {
return new OtlpJsonLoggingMetricExporter(aggregationTemporality);
OtlpStdoutMetricExporter delegate =
new OtlpStdoutMetricExporterBuilder(logger).setWrapperJsonObject(false).build();
return new OtlpJsonLoggingMetricExporter(delegate, aggregationTemporality);
}

private OtlpJsonLoggingMetricExporter(AggregationTemporality aggregationTemporality) {
OtlpJsonLoggingMetricExporter(
OtlpStdoutMetricExporter delegate, AggregationTemporality aggregationTemporality) {
this.delegate = delegate;
this.aggregationTemporality = aggregationTemporality;
}

@Override
public CompletableResultCode export(Collection<MetricData> logs) {
return delegate.export(logs);
}

@Override
public CompletableResultCode flush() {
return delegate.flush();
}

@Override
public CompletableResultCode shutdown() {
return delegate.shutdown();
}

/**
* Return the aggregation temporality.
*
Expand All @@ -69,41 +81,4 @@ public AggregationTemporality getPreferredTemporality() {
public AggregationTemporality getAggregationTemporality(InstrumentType instrumentType) {
return aggregationTemporality;
}

@Override
public CompletableResultCode export(Collection<MetricData> metrics) {
if (isShutdown.get()) {
return CompletableResultCode.ofFailure();
}

ResourceMetricsMarshaler[] allResourceMetrics = ResourceMetricsMarshaler.create(metrics);
for (ResourceMetricsMarshaler resourceMetrics : allResourceMetrics) {
SegmentedStringWriter sw = new SegmentedStringWriter(JSON_FACTORY._getBufferRecycler());
try (JsonGenerator gen = JsonUtil.create(sw)) {
resourceMetrics.writeJsonTo(gen);
} catch (IOException e) {
// Shouldn't happen in practice, just skip it.
continue;
}
try {
logger.log(Level.INFO, sw.getAndClear());
} catch (IOException e) {
logger.log(Level.WARNING, "Unable to read OTLP JSON metrics", e);
}
}
return CompletableResultCode.ofSuccess();
}

@Override
public CompletableResultCode flush() {
return CompletableResultCode.ofSuccess();
}

@Override
public CompletableResultCode shutdown() {
if (!isShutdown.compareAndSet(false, true)) {
logger.log(Level.INFO, "Calling shutdown() multiple times.");
}
return CompletableResultCode.ofSuccess();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,12 @@

package io.opentelemetry.exporter.logging.otlp;

import static io.opentelemetry.exporter.logging.otlp.internal.writer.JsonUtil.JSON_FACTORY;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.io.SegmentedStringWriter;
import io.opentelemetry.exporter.internal.otlp.traces.ResourceSpansMarshaler;
import io.opentelemetry.exporter.logging.otlp.internal.writer.JsonUtil;
import io.opentelemetry.exporter.logging.otlp.internal.traces.OtlpStdoutSpanExporter;
import io.opentelemetry.exporter.logging.otlp.internal.traces.OtlpStdoutSpanExporterBuilder;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
Expand All @@ -29,49 +22,31 @@ public final class OtlpJsonLoggingSpanExporter implements SpanExporter {
private static final Logger logger =
Logger.getLogger(OtlpJsonLoggingSpanExporter.class.getName());

private final AtomicBoolean isShutdown = new AtomicBoolean();
private final OtlpStdoutSpanExporter delegate;

/** Returns a new {@link OtlpJsonLoggingSpanExporter}. */
public static SpanExporter create() {
return new OtlpJsonLoggingSpanExporter();
OtlpStdoutSpanExporter delegate =
new OtlpStdoutSpanExporterBuilder(logger).setWrapperJsonObject(false).build();
return new OtlpJsonLoggingSpanExporter(delegate);
}

private OtlpJsonLoggingSpanExporter() {}
OtlpJsonLoggingSpanExporter(OtlpStdoutSpanExporter delegate) {
this.delegate = delegate;
}

@Override
public CompletableResultCode export(Collection<SpanData> spans) {
if (isShutdown.get()) {
return CompletableResultCode.ofFailure();
}

ResourceSpansMarshaler[] allResourceSpans = ResourceSpansMarshaler.create(spans);
for (ResourceSpansMarshaler resourceSpans : allResourceSpans) {
SegmentedStringWriter sw = new SegmentedStringWriter(JSON_FACTORY._getBufferRecycler());
try (JsonGenerator gen = JsonUtil.create(sw)) {
resourceSpans.writeJsonTo(gen);
} catch (IOException e) {
// Shouldn't happen in practice, just skip it.
continue;
}
try {
logger.log(Level.INFO, sw.getAndClear());
} catch (IOException e) {
logger.log(Level.WARNING, "Unable to read OTLP JSON spans", e);
}
}
return CompletableResultCode.ofSuccess();
public CompletableResultCode export(Collection<SpanData> logs) {
return delegate.export(logs);
}

@Override
public CompletableResultCode flush() {
return CompletableResultCode.ofSuccess();
return delegate.flush();
}

@Override
public CompletableResultCode shutdown() {
if (!isShutdown.compareAndSet(false, true)) {
logger.log(Level.INFO, "Calling shutdown() multiple times.");
}
return CompletableResultCode.ofSuccess();
return delegate.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.logging.otlp.internal;
package io.opentelemetry.exporter.logging.otlp.internal.metrics;

import io.opentelemetry.exporter.logging.otlp.OtlpJsonLoggingMetricExporter;
import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties;
Expand All @@ -17,6 +17,7 @@
* at any time.
*/
public class LoggingMetricExporterProvider implements ConfigurableMetricExporterProvider {

@Override
public MetricExporter createExporter(ConfigProperties config) {
return OtlpJsonLoggingMetricExporter.create();
Expand Down
Loading
Loading