Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

exporters: add otlplogfile exporter #5743

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

## [Unreleased]

### Added

- Add `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlplogfile ` experimental logs exporter. (#5743)
Copy link
Member

@pellared pellared Sep 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- Add `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlplogfile ` experimental logs exporter. (#5743)
- Add `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlplogfile` experimental OTLP logs file exporter. (#5743)

### Fixed

- Fix memory leak in the global `MeterProvider` when identical instruments are repeatedly created. (#5754)
Expand Down
3 changes: 3 additions & 0 deletions exporters/otlp/otlplog/otlplogfile/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# OTLP Log File Exporter

[![PkgGoDev](https://pkg.go.dev/badge/go.opentelemetry.io/otel/exporters/otlp/otlplog/otlplogfile)](https://pkg.go.dev/go.opentelemetry.io/otel/exporters/otlp/otlplog/otlplogfile)
73 changes: 73 additions & 0 deletions exporters/otlp/otlplog/otlplogfile/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package otlplogfile // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlplogfile"

import (
"errors"
"fmt"
"io"
"os"
"time"
)

// Option configures a field of the configuration or return an error if needed.
type Option func(*config) (*config, error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please implement Option as described in https://github.com/open-telemetry/opentelemetry-go/blob/main/CONTRIBUTING.md#option. Consistency makes the maintenance easier for us.


// config contains options for the OTLP Log file exporter.
type config struct {
// Out is the output where the records should be written.
out io.WriteCloser
// Duration represents the interval when the buffer should be flushed.
flushInterval time.Duration
}

func newConfig(options []Option) (*config, error) {
c := &config{
out: os.Stdout,
flushInterval: 5 * time.Second,
}

var configErr error
for _, opt := range options {
if _, err := opt(c); err != nil {
configErr = errors.Join(configErr, err)
}
}

if configErr != nil {
return nil, configErr
}

return c, nil
}

// WithFile configures a file where the records will be exported.
// An error is returned if the file could not be created or opened.
func WithFile(path string) Option {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not need such option.

return func(c *config) (*config, error) {
file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0o644)
if err != nil {
return nil, fmt.Errorf("failed to open file: %w", err)
}

return WithWriter(file)(c)
}
}

// WithWriter configures the destination where the exporter should output
// the records. By default, if not specified, stdout is used.
func WithWriter(w io.WriteCloser) Option {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exporter must not be responsible for closing the writer (like e.g. stdoutlog)
Would you want to close stdout? Or what if someone uses one file for multiple purposes?
The caller should be responsible for closing the writer.

return func(c *config) (*config, error) {
c.out = w
return c, nil
}
}

// WithFlushInterval configures the duration after which the buffer is periodically flushed to the output.
func WithFlushInterval(flushInterval time.Duration) Option {
return func(c *config) (*config, error) {
c.flushInterval = flushInterval
return c, nil
}
}
12 changes: 12 additions & 0 deletions exporters/otlp/otlplog/otlplogfile/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

/*
Package otlplogfile provides an OTLP log exporter that outputs log records to a JSON line file. The exporter uses a buffered
file writer to write log records to file to reduce I/O and improve performance.

All Exporters must be created with [New].

See: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/file-exporter.md
*/
package otlplogfile // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlplogfile"
37 changes: 37 additions & 0 deletions exporters/otlp/otlplog/otlplogfile/example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package otlplogfile_test

import (
"context"
"time"

"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlplogfile"
"go.opentelemetry.io/otel/log/global"
"go.opentelemetry.io/otel/sdk/log"
)

func Example() {
ctx := context.Background()
exp, err := otlplogfile.New(
otlplogfile.WithFile("/tmp/otlp-logs.jsonl"),
otlplogfile.WithFlushInterval(time.Second),
)
if err != nil {
panic(err)
}

processor := log.NewBatchProcessor(exp)
provider := log.NewLoggerProvider(log.WithProcessor(processor))
defer func() {
if err := provider.Shutdown(ctx); err != nil {
panic(err)
}
}()

global.SetLoggerProvider(provider)

// From here, the provider can be used by instrumentation to collect
// telemetry.
}
98 changes: 98 additions & 0 deletions exporters/otlp/otlplog/otlplogfile/exporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package otlplogfile // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlplogfile"

import (
"context"
"sync"

"google.golang.org/protobuf/encoding/protojson"

"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlplogfile/internal/transform"
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlplogfile/internal/writer"
"go.opentelemetry.io/otel/sdk/log"
lpb "go.opentelemetry.io/proto/otlp/logs/v1"
)

// Exporter is an OpenTelemetry log exporter that outputs log records
// into files, as JSON. The implementation is based on the specification
// defined here: https://github.com/open-telemetry/opentelemetry-specification/blob/v1.36.0/specification/protocol/file-exporter.md
type Exporter struct {
mu sync.Mutex
w *writer.Writer
stopped bool
}

// Compile-time check that the implementation satisfies the interface.
var _ log.Exporter = &Exporter{}

// New returns a new [Exporter].
func New(options ...Option) (*Exporter, error) {
cfg, err := newConfig(options)
if err != nil {
return nil, err
}

w, err := writer.New(cfg.out, cfg.flushInterval)
if err != nil {
return nil, err
}

return &Exporter{
w: w,
stopped: false,
}, nil
}

// Export exports logs records to the file.
func (e *Exporter) Export(ctx context.Context, records []log.Record) error {
// Honor context cancellation
if err := ctx.Err(); err != nil {
return err
}

e.mu.Lock()
defer e.mu.Unlock()

if e.stopped {
return nil
}

data := &lpb.LogsData{
ResourceLogs: transform.ResourceLogs(records),
}

by, err := protojson.Marshal(data)
if err != nil {
return err
}

return e.w.Export(by)
}

// ForceFlush flushes data to the file.
func (e *Exporter) ForceFlush(_ context.Context) error {
e.mu.Lock()
defer e.mu.Unlock()

if e.stopped {
return nil
}

return e.w.Flush()
}

// Shutdown shuts down the exporter. Buffered data is written to disk,
// and opened resources such as file will be closed.
func (e *Exporter) Shutdown(_ context.Context) error {
e.mu.Lock()
defer e.mu.Unlock()

if e.stopped {
return nil
}

e.stopped = true
return e.w.Shutdown()
}
101 changes: 101 additions & 0 deletions exporters/otlp/otlplog/otlplogfile/exporter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package otlplogfile // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlplogfile"
import (
"context"
"fmt"
"os"
"path"
"runtime"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel/log"

sdklog "go.opentelemetry.io/otel/sdk/log"
)

// tempFile creates a temporary file for the given test case and returns its path on disk.
// The file is automatically cleaned up when the test ends.
func tempFile(tb testing.TB) *os.File {
f, err := os.CreateTemp(tb.TempDir(), tb.Name())
assert.NoError(tb, err, "must not error when creating temporary file")
tb.Cleanup(func() {
assert.NoError(tb, os.RemoveAll(path.Dir(f.Name())), "must clean up files after being written")
})
return f
}

// makeRecords is a helper function to generate an array of log record with the desired size.
func makeRecords(count int, message string) []sdklog.Record {
var records []sdklog.Record
for i := 0; i < count; i++ {
r := sdklog.Record{}
r.SetSeverityText("INFO")
r.SetSeverity(log.SeverityInfo)
r.SetBody(log.StringValue(message))
r.SetTimestamp(time.Now())
r.SetObservedTimestamp(time.Now())
records = append(records, r)
}
return records
}

func TestExporter(t *testing.T) {
file := tempFile(t)
records := makeRecords(1, "hello, world!")

exporter, err := New(WithWriter(file))
assert.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, exporter.Shutdown(context.TODO()))
})

err = exporter.Export(context.TODO(), records)
assert.NoError(t, err)
err = exporter.ForceFlush(context.TODO())
assert.NoError(t, err)
}

func TestExporterConcurrentSafe(t *testing.T) {
file := tempFile(t)
exporter, err := New(WithWriter(file))
require.NoError(t, err, "New()")

const goroutines = 10

var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())
runs := new(uint64)
for i := 0; i < goroutines; i++ {
wg.Add(1)
i := i
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
default:
_ = exporter.Export(ctx, makeRecords(1, fmt.Sprintf("log from goroutine %d", i)))
_ = exporter.ForceFlush(ctx)
atomic.AddUint64(runs, 1)
}
}
}()
}

for atomic.LoadUint64(runs) == 0 {
runtime.Gosched()
}

assert.NoError(t, exporter.Shutdown(ctx), "must not error when shutting down")
cancel()
wg.Wait()
}
37 changes: 37 additions & 0 deletions exporters/otlp/otlplog/otlplogfile/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
module go.opentelemetry.io/otel/exporters/otlp/otlplog/otlplogfile

go 1.22

require (
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/otel v1.29.0
go.opentelemetry.io/otel/log v0.5.0
go.opentelemetry.io/otel/sdk v1.29.0
go.opentelemetry.io/otel/sdk/log v0.4.0
go.opentelemetry.io/otel/trace v1.29.0
go.opentelemetry.io/proto/otlp v1.3.1
google.golang.org/protobuf v1.34.1
dmathieu marked this conversation as resolved.
Show resolved Hide resolved
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opentelemetry.io/otel/metric v1.29.0 // indirect
golang.org/x/sys v0.24.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace go.opentelemetry.io/otel => ../../../..

replace go.opentelemetry.io/otel/sdk/log => ../../../../sdk/log

replace go.opentelemetry.io/otel/sdk => ../../../../sdk

replace go.opentelemetry.io/otel/log => ../../../../log

replace go.opentelemetry.io/otel/trace => ../../../../trace

replace go.opentelemetry.io/otel/metric => ../../../../metric
Loading