Skip to content

Commit

Permalink
[CAPPL-40] Add custom compute capability (#14496)
Browse files Browse the repository at this point in the history
* [CAPPL-40] Add Custom Compute Capability

- Pass through binary + config through the workflow engine

* Address feedback

* Add wasmtest

* Add evictAfterSize

* Small changes

* Linting

* Changeset

* Remove errant error handling

* Error wrapping

* Quote errors

* Test fixes
  • Loading branch information
cedric-cordenier authored Oct 4, 2024
1 parent 8ca41fc commit 25c4698
Show file tree
Hide file tree
Showing 24 changed files with 893 additions and 34 deletions.
5 changes: 5 additions & 0 deletions .changeset/smooth-queens-dance.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

Add compute capability #internal
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,5 @@ override*.toml
ocr_soak_report.csv

vendor/*

*.wasm
132 changes: 132 additions & 0 deletions core/capabilities/compute/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package compute

import (
"sync"
"time"

"github.com/jonboulle/clockwork"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host"
)

var (
moduleCacheHit = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "compute_module_cache_hit",
Help: "hit vs non-hits of the module cache for custom compute",
}, []string{"hit"})
moduleCacheEviction = promauto.NewCounter(prometheus.CounterOpts{
Name: "compute_module_cache_eviction",
Help: "evictions from the module cache",
})
moduleCacheAddition = promauto.NewCounter(prometheus.CounterOpts{
Name: "compute_module_cache_addition",
Help: "additions to the module cache",
})
)

type moduleCache struct {
m map[string]*module
mu sync.RWMutex

wg sync.WaitGroup
stopChan services.StopChan

tickInterval time.Duration
timeout time.Duration
evictAfterSize int

clock clockwork.Clock
onReaper chan struct{}
}

func newModuleCache(clock clockwork.Clock, tick, timeout time.Duration, evictAfterSize int) *moduleCache {
return &moduleCache{
m: map[string]*module{},
tickInterval: tick,
timeout: timeout,
evictAfterSize: evictAfterSize,
clock: clock,
stopChan: make(chan struct{}),
}
}

func (mc *moduleCache) start() {
mc.wg.Add(1)
go func() {
defer mc.wg.Done()
mc.reapLoop()
}()
}

func (mc *moduleCache) close() {
close(mc.stopChan)
mc.wg.Wait()
}

func (mc *moduleCache) reapLoop() {
ticker := mc.clock.NewTicker(mc.tickInterval)
for {
select {
case <-ticker.Chan():
mc.evictOlderThan(mc.timeout)
if mc.onReaper != nil {
mc.onReaper <- struct{}{}
}
case <-mc.stopChan:
return
}
}
}

func (mc *moduleCache) add(id string, mod *module) {
mc.mu.Lock()
defer mc.mu.Unlock()
mod.lastFetchedAt = time.Now()
mc.m[id] = mod
moduleCacheAddition.Inc()
}

func (mc *moduleCache) get(id string) (*module, bool) {
mc.mu.Lock()
defer mc.mu.Unlock()
gotModule, ok := mc.m[id]
if !ok {
moduleCacheHit.WithLabelValues("false").Inc()
return nil, false
}

moduleCacheHit.WithLabelValues("true").Inc()
gotModule.lastFetchedAt = mc.clock.Now()
return gotModule, true
}

func (mc *moduleCache) evictOlderThan(duration time.Duration) {
mc.mu.Lock()
defer mc.mu.Unlock()

evicted := 0

if len(mc.m) > mc.evictAfterSize {
for id, m := range mc.m {
if mc.clock.Now().Sub(m.lastFetchedAt) > duration {
delete(mc.m, id)
m.module.Close()
evicted++
}

if len(mc.m) <= mc.evictAfterSize {
break
}
}
}

moduleCacheEviction.Add(float64(evicted))
}

type module struct {
module *host.Module
lastFetchedAt time.Time
}
88 changes: 88 additions & 0 deletions core/capabilities/compute/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package compute

import (
"testing"
"time"

"github.com/google/uuid"
"github.com/jonboulle/clockwork"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/wasmtest"
"github.com/smartcontractkit/chainlink/v2/core/logger"
)

const (
binaryLocation = "test/simple/cmd/testmodule.wasm"
binaryCmd = "core/capabilities/compute/test/simple/cmd"
)

func TestCache(t *testing.T) {
clock := clockwork.NewFakeClock()
tick := 1 * time.Second
timeout := 1 * time.Second

cache := newModuleCache(clock, tick, timeout, 0)
cache.onReaper = make(chan struct{}, 1)
cache.start()
defer cache.close()

binary := wasmtest.CreateTestBinary(binaryCmd, binaryLocation, false, t)
hmod, err := host.NewModule(&host.ModuleConfig{
Logger: logger.TestLogger(t),
IsUncompressed: true,
}, binary)
require.NoError(t, err)

id := uuid.New().String()
mod := &module{
module: hmod,
}
cache.add(id, mod)

got, ok := cache.get(id)
assert.True(t, ok)

assert.Equal(t, got, mod)

clock.Advance(15 * time.Second)
<-cache.onReaper
_, ok = cache.get(id)
assert.False(t, ok)
}

func TestCache_EvictAfterSize(t *testing.T) {
clock := clockwork.NewFakeClock()
tick := 1 * time.Second
timeout := 1 * time.Second

cache := newModuleCache(clock, tick, timeout, 1)
cache.onReaper = make(chan struct{}, 1)
cache.start()
defer cache.close()

binary := wasmtest.CreateTestBinary(binaryCmd, binaryLocation, false, t)
hmod, err := host.NewModule(&host.ModuleConfig{
Logger: logger.TestLogger(t),
IsUncompressed: true,
}, binary)
require.NoError(t, err)

id := uuid.New().String()
mod := &module{
module: hmod,
}
cache.add(id, mod)

got, ok := cache.get(id)
assert.True(t, ok)

assert.Equal(t, got, mod)

clock.Advance(15 * time.Second)
<-cache.onReaper
_, ok = cache.get(id)
assert.True(t, ok)
}
Loading

0 comments on commit 25c4698

Please sign in to comment.