Skip to content

Commit

Permalink
internal/civisibility/integrations/gotesting: fixes for orchestrion a…
Browse files Browse the repository at this point in the history
…utoinstrumentation (#2844)
  • Loading branch information
tonyredondo authored Sep 12, 2024
1 parent d1b31e8 commit e1ca75f
Show file tree
Hide file tree
Showing 9 changed files with 194 additions and 151 deletions.
3 changes: 3 additions & 0 deletions ddtrace/tracer/civisibility_payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/tinylib/msgp/msgp"
"gopkg.in/DataDog/dd-trace-go.v1/internal/globalconfig"
"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
"gopkg.in/DataDog/dd-trace-go.v1/internal/version"
)

Expand Down Expand Up @@ -46,6 +47,7 @@ func (p *ciVisibilityPayload) push(event *ciVisibilityEvent) error {
//
// A pointer to a newly initialized civisibilitypayload instance.
func newCiVisibilityPayload() *ciVisibilityPayload {
log.Debug("ciVisibilityPayload: creating payload instance")
return &ciVisibilityPayload{newPayload()}
}

Expand All @@ -61,6 +63,7 @@ func newCiVisibilityPayload() *ciVisibilityPayload {
// A pointer to a bytes.Buffer containing the encoded CI Visibility payload.
// An error if reading from the buffer or encoding the payload fails.
func (p *ciVisibilityPayload) getBuffer(config *config) (*bytes.Buffer, error) {
log.Debug("ciVisibilityPayload: .getBuffer (count: %v)", p.itemCount())

/*
The Payload format in the CI Visibility protocol is like this:
Expand Down
3 changes: 3 additions & 0 deletions ddtrace/tracer/civisibility_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ func newCiVisibilityTransport(config *config) *ciVisibilityTransport {
testCycleURL = fmt.Sprintf("%s/%s/%s", config.agentURL.String(), EvpProxyPath, TestCyclePath)
}

log.Debug("ciVisibilityTransport: creating transport instance [agentless: %v, testcycleurl: %v]", agentlessEnabled, testCycleURL)

return &ciVisibilityTransport{
config: config,
testCycleURLPath: testCycleURL,
Expand Down Expand Up @@ -157,6 +159,7 @@ func (t *ciVisibilityTransport) send(p *payload) (body io.ReadCloser, err error)
req.Header.Set("Content-Encoding", "gzip")
}

log.Debug("ciVisibilityTransport: sending transport request: %v bytes", buffer.Len())
response, err := t.config.httpClient.Do(req)
if err != nil {
return nil, err
Expand Down
4 changes: 4 additions & 0 deletions ddtrace/tracer/civisibility_tslv.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ func getCiVisibilityEvent(span *span) *ciVisibilityEvent {
// A pointer to the created ciVisibilityEvent.
func createTestEventFromSpan(span *span) *ciVisibilityEvent {
tSpan := createTslvSpan(span)
tSpan.ParentID = 0
tSpan.SessionID = getAndRemoveMetaToUInt64(span, constants.TestSessionIDTag)
tSpan.ModuleID = getAndRemoveMetaToUInt64(span, constants.TestModuleIDTag)
tSpan.SuiteID = getAndRemoveMetaToUInt64(span, constants.TestSuiteIDTag)
Expand All @@ -298,6 +299,7 @@ func createTestEventFromSpan(span *span) *ciVisibilityEvent {
// A pointer to the created ciVisibilityEvent.
func createTestSuiteEventFromSpan(span *span) *ciVisibilityEvent {
tSpan := createTslvSpan(span)
tSpan.ParentID = 0
tSpan.SessionID = getAndRemoveMetaToUInt64(span, constants.TestSessionIDTag)
tSpan.ModuleID = getAndRemoveMetaToUInt64(span, constants.TestModuleIDTag)
tSpan.SuiteID = getAndRemoveMetaToUInt64(span, constants.TestSuiteIDTag)
Expand All @@ -320,6 +322,7 @@ func createTestSuiteEventFromSpan(span *span) *ciVisibilityEvent {
// A pointer to the created ciVisibilityEvent.
func createTestModuleEventFromSpan(span *span) *ciVisibilityEvent {
tSpan := createTslvSpan(span)
tSpan.ParentID = 0
tSpan.SessionID = getAndRemoveMetaToUInt64(span, constants.TestSessionIDTag)
tSpan.ModuleID = getAndRemoveMetaToUInt64(span, constants.TestModuleIDTag)
return &ciVisibilityEvent{
Expand All @@ -341,6 +344,7 @@ func createTestModuleEventFromSpan(span *span) *ciVisibilityEvent {
// A pointer to the created ciVisibilityEvent.
func createTestSessionEventFromSpan(span *span) *ciVisibilityEvent {
tSpan := createTslvSpan(span)
tSpan.ParentID = 0
tSpan.SessionID = getAndRemoveMetaToUInt64(span, constants.TestSessionIDTag)
return &ciVisibilityEvent{
span: span,
Expand Down
11 changes: 6 additions & 5 deletions ddtrace/tracer/civisibility_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type ciVisibilityTraceWriter struct {
//
// A pointer to an initialized ciVisibilityTraceWriter.
func newCiVisibilityTraceWriter(c *config) *ciVisibilityTraceWriter {
log.Debug("ciVisibilityTraceWriter: creating trace writer instance")
return &ciVisibilityTraceWriter{
config: c,
payload: newCiVisibilityPayload(),
Expand All @@ -62,7 +63,7 @@ func (w *ciVisibilityTraceWriter) add(trace []*span) {
for _, s := range trace {
cvEvent := getCiVisibilityEvent(s)
if err := w.payload.push(cvEvent); err != nil {
log.Error("Error encoding msgpack: %v", err)
log.Error("ciVisibilityTraceWriter: Error encoding msgpack: %v", err)
}
if w.payload.size() > agentlessPayloadSizeLimit {
w.flush()
Expand Down Expand Up @@ -104,16 +105,16 @@ func (w *ciVisibilityTraceWriter) flush() {
var err error
for attempt := 0; attempt <= w.config.sendRetries; attempt++ {
size, count = p.size(), p.itemCount()
log.Debug("Sending payload: size: %d events: %d\n", size, count)
log.Debug("ciVisibilityTraceWriter: sending payload: size: %d events: %d\n", size, count)
_, err = w.config.transport.send(p.payload)
if err == nil {
log.Debug("sent events after %d attempts", attempt+1)
log.Debug("ciVisibilityTraceWriter: sent events after %d attempts", attempt+1)
return
}
log.Error("failure sending events (attempt %d), will retry: %v", attempt+1, err)
log.Error("ciVisibilityTraceWriter: failure sending events (attempt %d), will retry: %v", attempt+1, err)
p.reset()
time.Sleep(time.Millisecond)
}
log.Error("lost %d events: %v", count, err)
log.Error("ciVisibilityTraceWriter: lost %d events: %v", count, err)
}(oldp)
}
185 changes: 127 additions & 58 deletions internal/civisibility/integrations/gotesting/instrumentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ import (
"reflect"
"runtime"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
"unsafe"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/internal/civisibility/integrations"
Expand All @@ -23,6 +25,66 @@ import (
// The following functions are being used by the gotesting package for manual instrumentation and the orchestrion
// automatic instrumentation

type (
instrumentationMetadata struct {
IsInternal bool
}

ddTestItem struct {
test integrations.DdTest
error atomic.Int32
skipped atomic.Int32
}
)

var (
// instrumentationMap holds a map of *runtime.Func for tracking instrumented functions
instrumentationMap = map[*runtime.Func]*instrumentationMetadata{}

// instrumentationMapMutex is a read-write mutex for synchronizing access to instrumentationMap.
instrumentationMapMutex sync.RWMutex

// ciVisibilityTests holds a map of *testing.T or *testing.B to civisibility.DdTest for tracking tests.
ciVisibilityTests = map[unsafe.Pointer]*ddTestItem{}

// ciVisibilityTestsMutex is a read-write mutex for synchronizing access to ciVisibilityTests.
ciVisibilityTestsMutex sync.RWMutex
)

// getInstrumentationMetadata gets the stored instrumentation metadata for a given *runtime.Func.
func getInstrumentationMetadata(fn *runtime.Func) *instrumentationMetadata {
instrumentationMapMutex.RLock()
defer instrumentationMapMutex.RUnlock()
if v, ok := instrumentationMap[fn]; ok {
return v
}
return nil
}

// setInstrumentationMetadata stores an instrumentation metadata for a given *runtime.Func.
func setInstrumentationMetadata(fn *runtime.Func, metadata *instrumentationMetadata) {
instrumentationMapMutex.RLock()
defer instrumentationMapMutex.RUnlock()
instrumentationMap[fn] = metadata
}

// getCiVisibilityTest retrieves the CI visibility test associated with a given *testing.T, *testing.B, *testing.common
func getCiVisibilityTest(tb testing.TB) *ddTestItem {
ciVisibilityTestsMutex.RLock()
defer ciVisibilityTestsMutex.RUnlock()
if v, ok := ciVisibilityTests[reflect.ValueOf(tb).UnsafePointer()]; ok {
return v
}
return nil
}

// setCiVisibilityTest associates a CI visibility test with a given *testing.T, *testing.B, *testing.common
func setCiVisibilityTest(tb testing.TB, ciTest integrations.DdTest) {
ciVisibilityTestsMutex.Lock()
defer ciVisibilityTestsMutex.Unlock()
ciVisibilityTests[reflect.ValueOf(tb).UnsafePointer()] = &ddTestItem{test: ciTest}
}

// instrumentTestingM helper function to instrument internalTests and internalBenchmarks in a `*testing.M` instance.
func instrumentTestingM(m *testing.M) func(exitCode int) {
// Initialize CI Visibility
Expand Down Expand Up @@ -61,13 +123,30 @@ func instrumentTestingTFunc(f func(*testing.T)) func(*testing.T) {
moduleName, suiteName := utils.GetModuleAndSuiteName(fReflect.Pointer())
originalFunc := runtime.FuncForPC(fReflect.Pointer())

// Increment the test count in the module.
atomic.AddInt32(modulesCounters[moduleName], 1)
// Avoid instrumenting twice
metadata := getInstrumentationMetadata(originalFunc)
if metadata != nil && metadata.IsInternal {
// If is an internal test, we don't instrument because f is already the instrumented func by executeInternalTest
return f
}

instrumentedFn := func(t *testing.T) {
// Initialize module counters if not already present.
if _, ok := modulesCounters[moduleName]; !ok {
var v int32
modulesCounters[moduleName] = &v
}
// Increment the test count in the module.
atomic.AddInt32(modulesCounters[moduleName], 1)

// Increment the test count in the suite.
atomic.AddInt32(suitesCounters[suiteName], 1)
// Initialize suite counters if not already present.
if _, ok := suitesCounters[suiteName]; !ok {
var v int32
suitesCounters[suiteName] = &v
}
// Increment the test count in the suite.
atomic.AddInt32(suitesCounters[suiteName], 1)

return func(t *testing.T) {
// Create or retrieve the module, suite, and test for CI visibility.
module := session.GetOrCreateModuleWithFramework(moduleName, testFramework, runtime.Version())
suite := module.GetOrCreateSuite(suiteName)
Expand Down Expand Up @@ -101,51 +180,47 @@ func instrumentTestingTFunc(f func(*testing.T)) func(*testing.T) {
// Execute the original test function.
f(t)
}
setInstrumentationMetadata(runtime.FuncForPC(reflect.Indirect(reflect.ValueOf(instrumentedFn)).Pointer()), &instrumentationMetadata{IsInternal: true})
return instrumentedFn
}

// instrumentTestingTSetErrorInfo helper function to set an error in the `testing.T` CI Visibility span
func instrumentTestingTSetErrorInfo(t *testing.T, errType string, errMessage string, skip int) {
ciTest := getCiVisibilityTest(t)
if ciTest != nil {
ciTest.SetErrorInfo(errType, errMessage, utils.GetStacktrace(2+skip))
// instrumentSetErrorInfo helper function to set an error in the `*testing.T, *testing.B, *testing.common` CI Visibility span
func instrumentSetErrorInfo(tb testing.TB, errType string, errMessage string, skip int) {
ciTestItem := getCiVisibilityTest(tb)
if ciTestItem != nil && ciTestItem.error.CompareAndSwap(0, 1) && ciTestItem.test != nil {
ciTestItem.test.SetErrorInfo(errType, errMessage, utils.GetStacktrace(2+skip))
}
}

// instrumentTestingTCloseAndSkip helper function to close and skip with a reason a `testing.T` CI Visibility span
func instrumentTestingTCloseAndSkip(t *testing.T, skipReason string) {
ciTest := getCiVisibilityTest(t)
if ciTest != nil {
ciTest.CloseWithFinishTimeAndSkipReason(integrations.ResultStatusSkip, time.Now(), skipReason)
// instrumentCloseAndSkip helper function to close and skip with a reason a `*testing.T, *testing.B, *testing.common` CI Visibility span
func instrumentCloseAndSkip(tb testing.TB, skipReason string) {
ciTestItem := getCiVisibilityTest(tb)
if ciTestItem != nil && ciTestItem.skipped.CompareAndSwap(0, 1) && ciTestItem.test != nil {
ciTestItem.test.CloseWithFinishTimeAndSkipReason(integrations.ResultStatusSkip, time.Now(), skipReason)
}
}

// instrumentTestingTSkipNow helper function to close and skip a `testing.T` CI Visibility span
func instrumentTestingTSkipNow(t *testing.T) {
ciTest := getCiVisibilityTest(t)
if ciTest != nil {
ciTest.Close(integrations.ResultStatusSkip)
// instrumentSkipNow helper function to close and skip a `*testing.T, *testing.B, *testing.common` CI Visibility span
func instrumentSkipNow(tb testing.TB) {
ciTestItem := getCiVisibilityTest(tb)
if ciTestItem != nil && ciTestItem.skipped.CompareAndSwap(0, 1) && ciTestItem.test != nil {
ciTestItem.test.Close(integrations.ResultStatusSkip)
}
}

// instrumentTestingBFunc helper function to instrument a benchmark function func(*testing.B)
func instrumentTestingBFunc(pb *testing.B, name string, f func(*testing.B)) (string, func(*testing.B)) {
// Avoid instrumenting twice
if hasCiVisibilityBenchmarkFunc(&f) {
return name, f
}

// Reflect the function to obtain its pointer.
fReflect := reflect.Indirect(reflect.ValueOf(f))
moduleName, suiteName := utils.GetModuleAndSuiteName(fReflect.Pointer())
originalFunc := runtime.FuncForPC(fReflect.Pointer())

// Increment the test count in the module.
atomic.AddInt32(modulesCounters[moduleName], 1)

// Increment the test count in the suite.
atomic.AddInt32(suitesCounters[suiteName], 1)
// Avoid instrumenting twice
if hasCiVisibilityBenchmarkFunc(originalFunc) {
return name, f
}

return subBenchmarkAutoName, func(b *testing.B) {
instrumentedFunc := func(b *testing.B) {
// The sub-benchmark implementation relies on creating a dummy sub benchmark (called [DD:TestVisibility]) with
// a Run over the original sub benchmark function to get the child results without interfering measurements
// By doing this the name of the sub-benchmark are changed
Expand All @@ -155,6 +230,22 @@ func instrumentTestingBFunc(pb *testing.B, name string, f func(*testing.B)) (str
// benchmark/[DD:TestVisibility]/child
// We use regex and decrement the depth level of the benchmark to restore the original name

// Initialize module counters if not already present.
if _, ok := modulesCounters[moduleName]; !ok {
var v int32
modulesCounters[moduleName] = &v
}
// Increment the test count in the module.
atomic.AddInt32(modulesCounters[moduleName], 1)

// Initialize suite counters if not already present.
if _, ok := suitesCounters[suiteName]; !ok {
var v int32
suitesCounters[suiteName] = &v
}
// Increment the test count in the suite.
atomic.AddInt32(suitesCounters[suiteName], 1)

// Decrement level.
bpf := getBenchmarkPrivateFields(b)
bpf.AddLevel(-1)
Expand Down Expand Up @@ -190,7 +281,7 @@ func instrumentTestingBFunc(pb *testing.B, name string, f func(*testing.B)) (str
// Replace this function with the original one (executed only once - the first iteration[b.run1]).
*iPfOfB.benchFunc = f
// Set b to the CI visibility test.
setCiVisibilityBenchmark(b, test)
setCiVisibilityTest(b, test)

// Enable the timer again.
b.ResetTimer()
Expand All @@ -200,8 +291,7 @@ func instrumentTestingBFunc(pb *testing.B, name string, f func(*testing.B)) (str
f(b)
}

setCiVisibilityBenchmarkFunc(&instrumentedFunc)
defer deleteCiVisibilityBenchmarkFunc(&instrumentedFunc)
setCiVisibilityBenchmarkFunc(runtime.FuncForPC(reflect.Indirect(reflect.ValueOf(instrumentedFunc)).Pointer()))
b.Run(name, instrumentedFunc)

endTime := time.Now()
Expand Down Expand Up @@ -258,28 +348,7 @@ func instrumentTestingBFunc(pb *testing.B, name string, f func(*testing.B)) (str

checkModuleAndSuite(module, suite)
}
}

// instrumentTestingBSetErrorInfo helper function to set an error in the `testing.B` CI Visibility span
func instrumentTestingBSetErrorInfo(b *testing.B, errType string, errMessage string, skip int) {
ciTest := getCiVisibilityBenchmark(b)
if ciTest != nil {
ciTest.SetErrorInfo(errType, errMessage, utils.GetStacktrace(2+skip))
}
}

// instrumentTestingBCloseAndSkip helper function to close and skip with a reason a `testing.B` CI Visibility span
func instrumentTestingBCloseAndSkip(b *testing.B, skipReason string) {
ciTest := getCiVisibilityBenchmark(b)
if ciTest != nil {
ciTest.CloseWithFinishTimeAndSkipReason(integrations.ResultStatusSkip, time.Now(), skipReason)
}
}

// instrumentTestingBSkipNow helper function to close and skip a `testing.B` CI Visibility span
func instrumentTestingBSkipNow(b *testing.B) {
ciTest := getCiVisibilityBenchmark(b)
if ciTest != nil {
ciTest.Close(integrations.ResultStatusSkip)
}
setCiVisibilityBenchmarkFunc(originalFunc)
setCiVisibilityBenchmarkFunc(runtime.FuncForPC(reflect.Indirect(reflect.ValueOf(instrumentedFunc)).Pointer()))
return subBenchmarkAutoName, instrumentedFunc
}
Loading

0 comments on commit e1ca75f

Please sign in to comment.