Skip to content

Commit

Permalink
[BCF-2631] Add telemetryType to config; small adjustments to pipeline (
Browse files Browse the repository at this point in the history
  • Loading branch information
cedric-cordenier authored Nov 7, 2023
1 parent 71f4b32 commit 6de9cc4
Show file tree
Hide file tree
Showing 11 changed files with 162 additions and 97 deletions.
56 changes: 33 additions & 23 deletions pkg/loop/internal/pb/pipeline_runner.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion pkg/loop/internal/pb/pipeline_runner.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ message TaskResult {
google.protobuf.Value value = 3;
string error = 4;
bool has_error = 5;
int32 index = 6;
bool is_terminal = 6;
int32 index = 7;
}

message RunResponse {
Expand Down
86 changes: 48 additions & 38 deletions pkg/loop/internal/pb/reporting_plugin_service.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/loop/internal/pb/reporting_plugin_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ message ReportingPluginServiceConfig {
string command = 2;
string pluginName = 3;
string pluginConfig = 4;
string telemetryType = 5;
}

// NewReportingPluginFactoryRequest has arguments for [github.com/smartcontractkit/chainlink-relay/pkg/loop/reporting_plugins/LOOPPService.NewReportingPluginFactory].
Expand Down
26 changes: 15 additions & 11 deletions pkg/loop/internal/pipeline_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func newPipelineRunnerClient(cc grpc.ClientConnInterface) *pipelineRunnerService
return &pipelineRunnerServiceClient{grpc: pb.NewPipelineRunnerServiceClient(cc)}
}

func (p pipelineRunnerServiceClient) ExecuteRun(ctx context.Context, spec string, vars types.Vars, options types.Options) ([]types.TaskResult, error) {
func (p pipelineRunnerServiceClient) ExecuteRun(ctx context.Context, spec string, vars types.Vars, options types.Options) (types.TaskResults, error) {
varsStruct, err := structpb.NewStruct(vars.Vars)
if err != nil {
return nil, err
Expand All @@ -49,10 +49,13 @@ func (p pipelineRunnerServiceClient) ExecuteRun(ctx context.Context, spec string
err = errors.New(trr.Error)
}
trs[i] = types.TaskResult{
ID: trr.Id,
Type: trr.Type,
Value: trr.Value.AsInterface(),
Error: err,
ID: trr.Id,
Type: trr.Type,
TaskValue: types.TaskValue{
Value: trr.Value.AsInterface(),
Error: err,
IsTerminal: trr.IsTerminal,
},
Index: int(trr.Index),
}
}
Expand Down Expand Up @@ -94,12 +97,13 @@ func (p *pipelineRunnerServiceServer) ExecuteRun(ctx context.Context, rr *pb.Run
errs = trr.Error.Error()
}
taskResults[i] = &pb.TaskResult{
Id: trr.ID,
Type: trr.Type,
Value: v,
Error: errs,
HasError: hasError,
Index: int32(trr.Index),
Id: trr.ID,
Type: trr.Type,
Value: v,
Error: errs,
HasError: hasError,
IsTerminal: trr.IsTerminal,
Index: int32(trr.Index),
}
}

Expand Down
14 changes: 9 additions & 5 deletions pkg/loop/internal/pipeline_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type mockPipelineRunner struct {
options types.Options
}

func (m *mockPipelineRunner) ExecuteRun(ctx context.Context, spec string, vars types.Vars, options types.Options) ([]types.TaskResult, error) {
func (m *mockPipelineRunner) ExecuteRun(ctx context.Context, spec string, vars types.Vars, options types.Options) (types.TaskResults, error) {
m.spec, m.vars, m.options = spec, vars, options
return m.taskResults, m.err
}
Expand All @@ -39,13 +39,17 @@ func (c *clientAdapter) ExecuteRun(ctx context.Context, in *pb.RunRequest, opts
func TestPipelineRunnerService(t *testing.T) {
originalResults := []types.TaskResult{
{
ID: "1",
Value: float64(123),
ID: "1",
TaskValue: types.TaskValue{
Value: float64(123),
},
Index: 0,
},
{
ID: "2",
Error: errors.New("Error task"),
ID: "2",
TaskValue: types.TaskValue{
Error: errors.New("Error task"),
},
Index: 1,
},
}
Expand Down
18 changes: 10 additions & 8 deletions pkg/loop/internal/reporting_plugin_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,11 @@ func (m *ReportingPluginServiceClient) NewReportingPluginFactory(

reply, err := m.reportingPluginService.NewReportingPluginFactory(ctx, &pb.NewReportingPluginFactoryRequest{
ReportingPluginServiceConfig: &pb.ReportingPluginServiceConfig{
ProviderType: config.ProviderType,
Command: config.Command,
PluginName: config.PluginName,
PluginConfig: config.PluginConfig,
ProviderType: config.ProviderType,
Command: config.Command,
PluginName: config.PluginName,
TelemetryType: config.TelemetryType,
PluginConfig: config.PluginConfig,
},
ProviderID: providerID,
ErrorLogID: errorLogID,
Expand Down Expand Up @@ -135,10 +136,11 @@ func (m *reportingPluginServiceServer) NewReportingPluginFactory(ctx context.Con
telemetry := NewTelemetryServiceClient(telemetryConn)

config := types.ReportingPluginServiceConfig{
ProviderType: request.ReportingPluginServiceConfig.ProviderType,
PluginConfig: request.ReportingPluginServiceConfig.PluginConfig,
PluginName: request.ReportingPluginServiceConfig.PluginName,
Command: request.ReportingPluginServiceConfig.Command,
ProviderType: request.ReportingPluginServiceConfig.ProviderType,
PluginConfig: request.ReportingPluginServiceConfig.PluginConfig,
PluginName: request.ReportingPluginServiceConfig.PluginName,
Command: request.ReportingPluginServiceConfig.Command,
TelemetryType: request.ReportingPluginServiceConfig.TelemetryType,
}

factory, err := m.impl.NewReportingPluginFactory(ctx, config, providerConn, pipelineRunner, telemetry, errorLog)
Expand Down
Loading

0 comments on commit 6de9cc4

Please sign in to comment.