Skip to content

Commit

Permalink
refactor: introduce interface to export metrics
Browse files Browse the repository at this point in the history
Signed-off-by: inge4pres <[email protected]>
  • Loading branch information
inge4pres committed Oct 15, 2024
1 parent 0c4da2f commit f0753e2
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 49 deletions.
84 changes: 51 additions & 33 deletions src/metrics/exporter.zig
Original file line number Diff line number Diff line change
Expand Up @@ -8,42 +8,59 @@ const pbcommon = @import("../opentelemetry/proto/common/v1.pb.zig");
const reader = @import("reader.zig");
const MetricExporter = reader.MetricExporter;
const MetricReadError = reader.MetricReadError;
const ExportFn = reader.ExportFn;

pub fn ImMemoryExporter(allocator: std.mem.Allocator) type {
return struct {
const Self = @This();

var global: std.ArrayList(pbmetrics.ResourceMetrics) = std.ArrayList(pbmetrics.ResourceMetrics).init(allocator);

pub fn exporter() *const ExportFn {
return Self.exportBatch;
}

fn exportBatch(metrics: pbmetrics.MetricsData) MetricReadError!void {
Self.global.clearRetainingCapacity();
Self.global.appendSlice(metrics.resource_metrics.items) catch |e| {
std.debug.print("error exporting to memory, allocation error: {?}", .{e});
return MetricReadError.ExportFailed;
};
return;
}

pub fn fetch() []pbmetrics.ResourceMetrics {
return Self.global.items;
}

pub fn deinit() void {
Self.global.deinit();
}
};
}

/// ExporterIface is the type representing the interface for exporting metrics.
/// Implementations can be achieved by any type by having a member field of type
/// ExporterIface and a member function exporttBatch with the same signature.
pub const ExporterIface = struct {
exportFn: *const fn (*ExporterIface, pbmetrics.MetricsData) MetricReadError!void,

pub fn exportBatch(self: *ExporterIface, data: pbmetrics.MetricsData) MetricReadError!void {
return self.exportFn(self, data);
}
};

pub const ImMemoryExporter = struct {
const Self = @This();
allocator: std.mem.Allocator,
data: std.ArrayList(pbmetrics.ResourceMetrics) = undefined,
// Implement the interface via @fieldParentPtr
exporter: ExporterIface,

pub fn init(allocator: std.mem.Allocator) Self {
return Self{
.allocator = allocator,
.data = std.ArrayList(pbmetrics.ResourceMetrics).init(allocator),
.exporter = ExporterIface{
.exportFn = exportBatch,
},
};
}
pub fn deinit(self: *Self) void {
self.data.deinit();
}

fn exportBatch(iface: *ExporterIface, metrics: pbmetrics.MetricsData) MetricReadError!void {
const self: *Self = @fieldParentPtr("exporter", iface);

self.data.clearRetainingCapacity();
self.data.appendSlice(metrics.resource_metrics.items) catch |e| {
std.debug.print("error exporting to memory, allocation error: {?}", .{e});
return MetricReadError.ExportFailed;
};
return;
}

pub fn fetch(self: Self) []pbmetrics.ResourceMetrics {
return self.data.items;
}
};

test "in memory exporter stores data" {
const inMemExporter = ImMemoryExporter(std.testing.allocator);
var inMemExporter = ImMemoryExporter.init(std.testing.allocator);
defer inMemExporter.deinit();

const exporter = MetricExporter.new(inMemExporter.exporter());
const exporter = MetricExporter.new(&inMemExporter.exporter);

const howMany: usize = 2;
const dp = try std.testing.allocator.alloc(pbmetrics.NumberDataPoint, howMany);
Expand Down Expand Up @@ -84,7 +101,8 @@ test "in memory exporter stores data" {
defer metricsData.deinit();
try metricsData.resource_metrics.append(resource);

try exporter.exporter(metricsData);
const result = exporter.exportBatch(metricsData);
std.debug.assert(result == .Success);
const data = inMemExporter.fetch();

std.debug.assert(data.len == 1);
Expand Down
38 changes: 22 additions & 16 deletions src/metrics/reader.zig
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub const MetricReader = struct {
// Exporter is the destination of the metrics data.
// FIXME
// the default metric exporter should be the PeriodicExporter
exporter: MetricExporter = MetricExporter.new(noopExporter),
exporter: MetricExporter = undefined,

const Self = @This();

Expand Down Expand Up @@ -212,9 +212,11 @@ test "metric reader shutdown prevents collect() to execute" {
test "metric reader collects data from meter provider" {
var mp = try MeterProvider.init(std.testing.allocator);
defer mp.shutdown();

var noop = Exporter{ .exportFn = noopExporter };
var reader = MetricReader{
.allocator = std.testing.allocator,
.exporter = MetricExporter.new(noopExporter),
.exporter = MetricExporter.new(&noop),
};
defer reader.shutdown();

Expand Down Expand Up @@ -253,12 +255,12 @@ test "metric reader custom temporality" {
var mp = try MeterProvider.init(std.testing.allocator);
defer mp.shutdown();

const inMem = InMemoryExporter(std.testing.allocator);
var inMem = InMemoryExporter.init(std.testing.allocator);
defer inMem.deinit();

var reader = MetricReader{
.allocator = std.testing.allocator,
.exporter = MetricExporter.new(inMem.exporter()),
.exporter = MetricExporter.new(&inMem.exporter),
.temporality = deltaTemporality,
};
defer reader.shutdown();
Expand All @@ -281,16 +283,16 @@ pub const ExportResult = enum {
Failure,
};

pub const ExportFn = fn (pbmetrics.MetricsData) MetricReadError!void;
const Exporter = @import("exporter.zig").ExporterIface;

pub const MetricExporter = struct {
const Self = @This();
exporter: *const ExportFn,
exporter: *Exporter,
hasShutDown: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),

var exportCompleted: std.atomic.Value(bool) = std.atomic.Value(bool).init(false);

pub fn new(exporter: *const ExportFn) Self {
pub fn new(exporter: *Exporter) Self {
return Self{
.exporter = exporter,
};
Expand All @@ -307,7 +309,7 @@ pub const MetricExporter = struct {
defer exportCompleted.store(true, .release);

// Call the exporter function to process metrics data.
self.exporter(metrics) catch |e| {
self.exporter.exportBatch(metrics) catch |e| {
std.debug.print("MetricExporter exportBatch failed: {?}\n", .{e});
return ExportResult.Failure;
};
Expand All @@ -331,26 +333,26 @@ pub const MetricExporter = struct {
};

// test harness to build a noop exporter.
fn noopExporter(_: pbmetrics.MetricsData) MetricReadError!void {
fn noopExporter(_: *Exporter, _: pbmetrics.MetricsData) MetricReadError!void {
return;
}
// mocked metric exporter to assert metrics data are read once exported.
fn mockExporter(metrics: pbmetrics.MetricsData) MetricReadError!void {
fn mockExporter(_: *Exporter, metrics: pbmetrics.MetricsData) MetricReadError!void {
if (metrics.resource_metrics.items.len != 1) {
return MetricReadError.ExportFailed;
} // only one resource metrics is expected in this mock
}

// test harness to build an exporter that times out.
fn waiterExporter(_: pbmetrics.MetricsData) MetricReadError!void {
fn waiterExporter(_: *Exporter, _: pbmetrics.MetricsData) MetricReadError!void {
// Sleep for 1 second to simulate a slow exporter.
std.time.sleep(std.time.ns_per_ms * 1000);
return;
}

test "build no-op metric exporter" {
const exporter: *const ExportFn = noopExporter;
var me = MetricExporter.new(exporter);
var noop = Exporter{ .exportFn = noopExporter };
var me = MetricExporter.new(&noop);

const metrics = pbmetrics.MetricsData{
.resource_metrics = std.ArrayList(pbmetrics.ResourceMetrics).init(std.testing.allocator),
Expand All @@ -363,7 +365,9 @@ test "build no-op metric exporter" {
test "exported metrics by calling metric reader" {
var mp = try MeterProvider.init(std.testing.allocator);
defer mp.shutdown();
const me = MetricExporter.new(mockExporter);

var mock = Exporter{ .exportFn = mockExporter };
const me = MetricExporter.new(&mock);

var reader = MetricReader{ .allocator = std.testing.allocator, .exporter = me };
defer reader.shutdown();
Expand All @@ -380,7 +384,8 @@ test "exported metrics by calling metric reader" {
}

test "metric exporter force flush succeeds" {
var me = MetricExporter.new(noopExporter);
var noop = Exporter{ .exportFn = noopExporter };
var me = MetricExporter.new(&noop);

const metrics = pbmetrics.MetricsData{
.resource_metrics = std.ArrayList(pbmetrics.ResourceMetrics).init(std.testing.allocator),
Expand All @@ -397,7 +402,8 @@ fn backgroundRunner(me: *MetricExporter, metrics: pbmetrics.MetricsData) !void {
}

test "metric exporter force flush fails" {
var me = MetricExporter.new(waiterExporter);
var wait = Exporter{ .exportFn = waiterExporter };
var me = MetricExporter.new(&wait);

const metrics = pbmetrics.MetricsData{
.resource_metrics = std.ArrayList(pbmetrics.ResourceMetrics).init(std.testing.allocator),
Expand Down

0 comments on commit f0753e2

Please sign in to comment.