Skip to content

Commit

Permalink
Add redirectlogs processor
Browse files Browse the repository at this point in the history
  • Loading branch information
carsonip committed Feb 13, 2024
1 parent d80aac7 commit 68ba69f
Showing 1 changed file with 97 additions and 0 deletions.
97 changes: 97 additions & 0 deletions model/modelprocessor/redirectlogs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package modelprocessor

import (
"context"
"encoding/binary"
"fmt"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"strconv"
"time"

"github.com/elastic/apm-data/model/modelpb"
)

// RedirectLogs extracts all APMEvent logs from Batch, convert them to otel logs and send them to otelLogsProcessor,
// and remove them from the original batch.
type RedirectLogs struct {
OTelLogsProcessor modelpb.OTelLogsProcessor
}

func (r RedirectLogs) ProcessBatch(ctx context.Context, batch *modelpb.Batch) error {
// FIXME(carsonip): alloc
b := make(modelpb.Batch, 0, len(*batch))
for _, e := range *batch {
if e.Type() != modelpb.LogEventType {
b = append(b, e)
continue
}
logs := plog.NewLogs()
rl := logs.ResourceLogs().AppendEmpty()
rAttrs := rl.Resource().Attributes()
rAttrs.PutStr("service.name", e.GetService().GetName())
rAttrs.PutStr("service.version", e.GetService().GetVersion())
sl := rl.ScopeLogs().AppendEmpty()
sAttrs := sl.Scope().Attributes()
sl.Scope().SetName(e.GetService().GetFramework().GetName())
sl.Scope().SetVersion(e.GetService().GetFramework().GetVersion())
sAttrs.PutStr("log.logger", e.GetLog().GetLogger())
lr := sl.LogRecords().AppendEmpty()
lr.SetTimestamp(pcommon.Timestamp(e.GetTimestamp()))
lr.SetObservedTimestamp(pcommon.Timestamp(time.Now().UnixNano()))
lr.Body().SetStr(e.GetMessage())
if spanID := e.GetSpan().GetId(); spanID != "" {
if len(spanID) != 16 {
return fmt.Errorf("spanID len != 16; spanID = %q", spanID)
}
i, err := strconv.ParseUint(spanID, 16, 64)
if err != nil {
return err
}
var bytes [8]byte
binary.BigEndian.PutUint64(bytes[:], i)
lr.SetSpanID(bytes)
}
if traceID := e.GetTrace().GetId(); traceID != "" {
if len(traceID) != 32 {
return fmt.Errorf("traceID len != 32; traceID = %q", traceID)
}
upper, err := strconv.ParseUint(traceID[:16], 16, 64)
if err != nil {
return err
}
lower, err := strconv.ParseUint(traceID[16:], 16, 64)
if err != nil {
return err
}
var bytes [16]byte
binary.BigEndian.PutUint64(bytes[:8], upper)
binary.BigEndian.PutUint64(bytes[8:], lower)
lr.SetTraceID(bytes)
}
// FIXME(carsonip): there are many more other fields to be translated
if err := r.OTelLogsProcessor.ProcessLogs(ctx, logs); err != nil {
return err
}
}
// Mutate batch to filter away logs
*batch = b
return nil
}

0 comments on commit 68ba69f

Please sign in to comment.