Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
cedric-cordenier committed Oct 16, 2023
1 parent 66bf221 commit 0703867
Show file tree
Hide file tree
Showing 18 changed files with 286 additions and 274 deletions.
28 changes: 0 additions & 28 deletions pkg/loop/grpc_plugin_service_test.go

This file was deleted.

2 changes: 1 addition & 1 deletion pkg/loop/internal/median.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ type medianProviderClient struct {
func (m *medianProviderClient) ClientConn() grpc.ClientConnInterface { return m.cc }

func newMedianProviderClient(b *brokerExt, cc grpc.ClientConnInterface) *medianProviderClient {
m := &medianProviderClient{pluginProviderClient: newPluginProviderClient(b, cc)}
m := &medianProviderClient{pluginProviderClient: newPluginProviderClient(b.withName("MedianProviderClient"), cc)}
m.reportCodec = &reportCodecClient{b, pb.NewReportCodecClient(m.cc)}
m.medianContract = &medianContractClient{pb.NewMedianContractClient(m.cc)}
m.onchainConfigCodec = &onchainConfigCodecClient{b, pb.NewOnchainConfigCodecClient(m.cc)}
Expand Down
137 changes: 30 additions & 107 deletions pkg/loop/plugin_test.go → pkg/loop/internal/test/cmd/main.go
Original file line number Diff line number Diff line change
@@ -1,122 +1,39 @@
package loop_test
package main

import (
"context"
"flag"
"fmt"
"os"
"os/exec"
"strconv"
"testing"
"time"

"github.com/hashicorp/go-plugin"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"

"github.com/smartcontractkit/chainlink-relay/pkg/logger"
"github.com/smartcontractkit/chainlink-relay/pkg/loop"
"github.com/smartcontractkit/chainlink-relay/pkg/loop/internal/test"
"github.com/smartcontractkit/chainlink-relay/pkg/loop/reporting_plugins"
"github.com/smartcontractkit/chainlink-relay/pkg/types"
"github.com/smartcontractkit/chainlink-relay/pkg/utils"
)

// PluginGenericMedian is a generic plugin which wants a median provider
const PluginGenericMedian = "generic-median"

func testPlugin[I any](t *testing.T, name string, p plugin.Plugin, testFn func(*testing.T, I)) {
ctx, cancel := context.WithCancel(utils.Context(t))
defer cancel()

ch := make(chan *plugin.ReattachConfig, 1)
closeCh := make(chan struct{})
go plugin.Serve(&plugin.ServeConfig{
Test: &plugin.ServeTestConfig{
Context: ctx,
ReattachConfigCh: ch,
CloseCh: closeCh,
},
GRPCServer: plugin.DefaultGRPCServer,
Plugins: map[string]plugin.Plugin{name: p},
})

// We should get a config
var config *plugin.ReattachConfig
select {
case config = <-ch:
case <-time.After(5 * time.Second):
t.Fatal("should've received reattach")
}
require.NotNil(t, config)

c := plugin.NewClient(&plugin.ClientConfig{
Reattach: config,
Plugins: map[string]plugin.Plugin{name: p},
})
t.Cleanup(c.Kill)
clientProtocol, err := c.Client()
require.NoError(t, err)
defer clientProtocol.Close()
i, err := clientProtocol.Dispense(name)
require.NoError(t, err)

testFn(t, i.(I))

// stop plugin
cancel()
select {
case <-closeCh:
case <-time.After(5 * time.Second):
t.Fatal("should've stopped")
}
require.Error(t, clientProtocol.Ping())
}
func main() {
cmdS := ""
flag.StringVar(&cmdS, "cmd", "", "the name of the service to run")

func helperProcess(s ...string) *exec.Cmd {
cs := []string{"-test.run=TestHelperProcess", "--"}
cs = append(cs, s...)
env := []string{
"GO_WANT_HELPER_PROCESS=1",
}

cmd := exec.Command(os.Args[0], cs...)
cmd.Env = append(env, os.Environ()...)
return cmd
}

// This is not a real test. This is just a helper process kicked off by
// tests.
func TestHelperProcess(t *testing.T) {
if os.Getenv("GO_WANT_HELPER_PROCESS") != "1" {
return
}
limitI := 0
flag.IntVar(&limitI, "limit", 0, "the number of services to run")

flag.Parse()
defer os.Exit(0)

args := os.Args
for len(args) > 0 {
if args[0] == "--" {
args = args[1:]
break
}

args = args[1:]
}

if len(args) == 0 {
if cmdS == "" {
fmt.Fprintf(os.Stderr, "No command\n")
os.Exit(2)
}

cmd, args := args[0], args[1:]

limit := -1
if len(args) > 0 {
var err error
limit, err = strconv.Atoi(args[0])
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to parse integer limit: %s\n", err)
os.Exit(2)
}
if limitI != 0 {
limit = limitI
}

grpcServer := func(opts []grpc.ServerOption) *grpc.Server { return grpc.NewServer(opts...) }
Expand All @@ -128,14 +45,20 @@ func TestHelperProcess(t *testing.T) {
}
}

lggr, err := logger.New()
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to instantiate logger: %s\n", err)
os.Exit(2)
}

stopCh := make(chan struct{})
defer close(stopCh)
switch cmd {
switch cmdS {
case loop.PluginRelayerName:
plugin.Serve(&plugin.ServeConfig{
HandshakeConfig: loop.PluginRelayerHandshakeConfig(),
Plugins: map[string]plugin.Plugin{
loop.PluginRelayerName: &loop.GRPCPluginRelayer{PluginServer: test.StaticPluginRelayer{}, BrokerConfig: loop.BrokerConfig{Logger: logger.Test(t), StopCh: stopCh}},
loop.PluginRelayerName: &loop.GRPCPluginRelayer{PluginServer: test.StaticPluginRelayer{}, BrokerConfig: loop.BrokerConfig{Logger: lggr, StopCh: stopCh}},
},
GRPCServer: grpcServer,
})
Expand All @@ -145,20 +68,20 @@ func TestHelperProcess(t *testing.T) {
plugin.Serve(&plugin.ServeConfig{
HandshakeConfig: loop.PluginMedianHandshakeConfig(),
Plugins: map[string]plugin.Plugin{
loop.PluginRelayerName: &loop.GRPCPluginMedian{PluginServer: test.StaticPluginMedian{}, BrokerConfig: loop.BrokerConfig{Logger: logger.Test(t), StopCh: stopCh}},
loop.PluginRelayerName: &loop.GRPCPluginMedian{PluginServer: test.StaticPluginMedian{}, BrokerConfig: loop.BrokerConfig{Logger: lggr, StopCh: stopCh}},
},
GRPCServer: grpcServer,
})
os.Exit(0)

case loop.PluginServiceName:
case "plugin-service":
plugin.Serve(&plugin.ServeConfig{
HandshakeConfig: loop.PluginGenericHandshakeConfig(),
HandshakeConfig: reporting_plugins.PluginGenericHandshakeConfig(),
Plugins: map[string]plugin.Plugin{
loop.PluginServiceName: &loop.GRPCPluginService[types.PluginProvider]{
"plugin-service": &reporting_plugins.GRPCPluginService[types.PluginProvider]{
PluginServer: test.StaticGenericPlugin{},
BrokerConfig: loop.BrokerConfig{
Logger: logger.Test(t),
Logger: lggr,
StopCh: stopCh,
},
},
Expand All @@ -167,14 +90,14 @@ func TestHelperProcess(t *testing.T) {
})
os.Exit(0)

case PluginGenericMedian:
case "generic-median":
plugin.Serve(&plugin.ServeConfig{
HandshakeConfig: loop.PluginGenericHandshakeConfig(),
HandshakeConfig: reporting_plugins.PluginGenericHandshakeConfig(),
Plugins: map[string]plugin.Plugin{
loop.PluginServiceName: &loop.GRPCPluginService[types.MedianProvider]{
"plugin-service": &reporting_plugins.GRPCPluginService[types.MedianProvider]{
PluginServer: test.StaticGenericPluginMedianProvider{},
BrokerConfig: loop.BrokerConfig{
Logger: logger.Test(t),
Logger: lggr,
StopCh: stopCh,
},
},
Expand All @@ -184,7 +107,7 @@ func TestHelperProcess(t *testing.T) {
os.Exit(0)

default:
fmt.Fprintf(os.Stderr, "Unknown command: %q\n", cmd)
fmt.Fprintf(os.Stderr, "Unknown command: %q\n", cmdS)
os.Exit(2)
}
}
Expand Down
20 changes: 0 additions & 20 deletions pkg/loop/internal/test/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,13 @@ import (
"context"
"fmt"
"reflect"
"testing"

"google.golang.org/grpc"

"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-relay/pkg/loop/internal"
"github.com/smartcontractkit/chainlink-relay/pkg/types"
"github.com/smartcontractkit/chainlink-relay/pkg/utils"
)

func TestPluginGeneric(t *testing.T, p types.PluginClient) {
PluginGenericTest{}.TestPluginGeneric(t, p)
}

type PluginGenericTest struct{}

func (test PluginGenericTest) TestPluginGeneric(t *testing.T, p types.PluginClient) {
t.Run("PluginServer", func(t *testing.T) {
ctx := utils.Context(t)
factory, err := p.NewPluginServiceFactory(ctx, types.PluginServiceConfig{}, MockConn{}, &StaticErrorLog{})
require.NoError(t, err)

TestReportingPluginFactory(t, factory)
})
}

type MockConn struct {
grpc.ClientConnInterface
}
Expand Down
79 changes: 79 additions & 0 deletions pkg/loop/internal/test/test_plugin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package test

import (
"context"
"fmt"
"os"
"os/exec"
"testing"
"time"

"github.com/hashicorp/go-plugin"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-relay/pkg/utils"
)

type HelperProcessOptions struct {
Limit int
}

func HelperProcess(commandLocation string, command string, opts HelperProcessOptions) *exec.Cmd {
cmdArgs := []string{
"go", "run", commandLocation, fmt.Sprintf("-cmd=%s", command),
}
if opts.Limit != 0 {
cmdArgs = append(cmdArgs, fmt.Sprintf("-limit=%d", opts.Limit))
}
cmd := exec.Command(cmdArgs[0], cmdArgs[1:]...)
cmd.Env = append(os.Environ())
return cmd
}

func PluginTest[I any](t *testing.T, name string, p plugin.Plugin, testFn func(*testing.T, I)) {
ctx, cancel := context.WithCancel(utils.Context(t))
defer cancel()

ch := make(chan *plugin.ReattachConfig, 1)
closeCh := make(chan struct{})
go plugin.Serve(&plugin.ServeConfig{
Test: &plugin.ServeTestConfig{
Context: ctx,
ReattachConfigCh: ch,
CloseCh: closeCh,
},
GRPCServer: plugin.DefaultGRPCServer,
Plugins: map[string]plugin.Plugin{name: p},
})

// We should get a config
var config *plugin.ReattachConfig
select {
case config = <-ch:
case <-time.After(5 * time.Second):
t.Fatal("should've received reattach")
}
require.NotNil(t, config)

c := plugin.NewClient(&plugin.ClientConfig{
Reattach: config,
Plugins: map[string]plugin.Plugin{name: p},
})
t.Cleanup(c.Kill)
clientProtocol, err := c.Client()
require.NoError(t, err)
defer clientProtocol.Close()
i, err := clientProtocol.Dispense(name)
require.NoError(t, err)

testFn(t, i.(I))

// stop plugin
cancel()
select {
case <-closeCh:
case <-time.After(5 * time.Second):
t.Fatal("should've stopped")
}
require.Error(t, clientProtocol.Ping())
}
11 changes: 4 additions & 7 deletions pkg/loop/median_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,13 @@ import (

"github.com/smartcontractkit/chainlink-relay/pkg/logger"
"github.com/smartcontractkit/chainlink-relay/pkg/types"
"github.com/smartcontractkit/chainlink-relay/pkg/utils"
)

var _ ocrtypes.ReportingPluginFactory = (*MedianService)(nil)

// MedianService is a [types.Service] that maintains an internal [types.PluginMedian].
type MedianService struct {
pluginService[*GRPCPluginMedian, types.ReportingPluginFactory]
PluginService[*GRPCPluginMedian, types.ReportingPluginFactory]
}

// NewMedianService returns a new [*MedianService].
Expand All @@ -34,15 +33,13 @@ func NewMedianService(lggr logger.Logger, grpcOpts GRPCOpts, cmd func() *exec.Cm
lggr = logger.Named(lggr, "MedianService")
var ms MedianService
broker := BrokerConfig{StopCh: stopCh, Logger: lggr, GRPCOpts: grpcOpts}
ms.init(PluginMedianName, &GRPCPluginMedian{BrokerConfig: broker}, newService, lggr, cmd, stopCh)
ms.Init(PluginMedianName, &GRPCPluginMedian{BrokerConfig: broker}, newService, lggr, cmd, stopCh)
return &ms
}

func (m *MedianService) NewReportingPlugin(config ocrtypes.ReportingPluginConfig) (ocrtypes.ReportingPlugin, ocrtypes.ReportingPluginInfo, error) {
ctx, cancel := utils.ContextFromChan(m.pluginService.stopCh)
defer cancel()
if err := m.wait(ctx); err != nil {
if err := m.Wait(context.Background()); err != nil {
return nil, ocrtypes.ReportingPluginInfo{}, err
}
return m.service.NewReportingPlugin(config)
return m.Service.NewReportingPlugin(config)
}
Loading

0 comments on commit 0703867

Please sign in to comment.