From 8ac1fe34d154c7c3d9ab506011480ed6389806f8 Mon Sep 17 00:00:00 2001 From: george-dorin Date: Wed, 11 Oct 2023 16:55:45 +0300 Subject: [PATCH] Draft --- go.mod | 2 + go.sum | 13 + pkg/loop/internal/pb/generate.go | 1 + pkg/loop/internal/pb/pipeline_runner.pb.go | 539 ++++++++++++++++++ pkg/loop/internal/pb/pipeline_runner.proto | 44 ++ .../internal/pb/pipeline_runner_grpc.pb.go | 109 ++++ pkg/loop/internal/pipeline_runner.go | 125 ++++ pkg/loop/internal/types.go | 5 + pkg/types/pipeline_runner.go | 85 +++ 9 files changed, 923 insertions(+) create mode 100644 pkg/loop/internal/pb/pipeline_runner.pb.go create mode 100644 pkg/loop/internal/pb/pipeline_runner.proto create mode 100644 pkg/loop/internal/pb/pipeline_runner_grpc.pb.go create mode 100644 pkg/loop/internal/pipeline_runner.go create mode 100644 pkg/types/pipeline_runner.go diff --git a/go.mod b/go.mod index 28cfeb406..500c26bbe 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.15.0 github.com/riferrei/srclient v0.5.4 + github.com/satori/go.uuid v1.2.0 github.com/shopspring/decimal v1.3.1 github.com/smartcontractkit/libocr v0.0.0-20230922131214-122accb19ea6 github.com/stretchr/testify v1.8.4 @@ -26,6 +27,7 @@ require ( golang.org/x/exp v0.0.0-20230425010034-47ecfdc1ba53 google.golang.org/grpc v1.55.0 google.golang.org/protobuf v1.30.0 + gopkg.in/guregu/null.v4 v4.0.0 ) require ( diff --git a/go.sum b/go.sum index 30d454e0a..e00a38f40 100644 --- a/go.sum +++ b/go.sum @@ -21,7 +21,9 @@ cloud.google.com/go/bigquery v1.5.0/go.mod h1:snEHRnqQbz117VIFhE8bmtwIDY80NLUZUM cloud.google.com/go/bigquery v1.7.0/go.mod h1://okPTzCYNXSlb24MZs83e2Do+h+VXtc4gLoIoXIAPc= cloud.google.com/go/bigquery v1.8.0/go.mod h1:J5hqkt3O0uAFnINi6JXValWIb1v0goeZM77hZzJN/fQ= cloud.google.com/go/compute v1.18.0 h1:FEigFqoDbys2cvFkZ9Fjq4gnHBP55anJ0yQyau2f9oY= +cloud.google.com/go/compute v1.18.0/go.mod h1:1X7yHxec2Ga+Ss6jPyjxRxpu2uu7PLgsOVXvgU0yacs= cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGBW5aJ7UnBMY= +cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2AawlZn8kiOGuCv6gTkwuA= cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE= cloud.google.com/go/datastore v1.1.0/go.mod h1:umbIZjpQpHh4hmRpGhH4tLFup+FVzqBi1b3c64qFpCk= cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I= @@ -45,6 +47,7 @@ github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZx github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bufbuild/protocompile v0.4.0 h1:LbFKd2XowZvQ/kajzguUp2DC9UEIQhIq77fZZlaQsNA= +github.com/bufbuild/protocompile v0.4.0/go.mod h1:3v93+mbWn/v3xzN+31nwkJfrEpAUwp+BagBSZWx+TP8= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= @@ -136,6 +139,7 @@ github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= @@ -176,6 +180,7 @@ github.com/jhump/goprotoc v0.5.0/go.mod h1:VrbvcYrQOrTi3i0Vf+m+oqQWk9l72mjkJCYo7 github.com/jhump/protoreflect v1.11.0/go.mod h1:U7aMIjN0NWq9swDP7xDdoMfRHb35uiuTd3Z9nFXJf5E= github.com/jhump/protoreflect v1.12.0/go.mod h1:JytZfP5d0r8pVNLZvai7U/MCuTWITgrI4tTg7puQFKI= github.com/jhump/protoreflect v1.15.1 h1:HUMERORf3I3ZdX05WaQ6MIpd/NJ434hTp5YiKgfCL6c= +github.com/jhump/protoreflect v1.15.1/go.mod h1:jD/2GMKKE6OqX8qTjhADU1e6DShO+gavG9e0Q693nKo= github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= @@ -188,6 +193,7 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -249,6 +255,8 @@ github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/f github.com/santhosh-tekuri/jsonschema/v5 v5.0.0/go.mod h1:FKdcjfQW6rpZSnxxUvEA5H/cDPdvJ/SZJQLWWXWGrZ0= github.com/santhosh-tekuri/jsonschema/v5 v5.1.1 h1:lEOLY2vyGIqKWUI9nzsOJRV3mb3WC9dXYORsLEUcoeY= github.com/santhosh-tekuri/jsonschema/v5 v5.1.1/go.mod h1:FKdcjfQW6rpZSnxxUvEA5H/cDPdvJ/SZJQLWWXWGrZ0= +github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww= +github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8= github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= github.com/smartcontractkit/go-plugin v0.0.0-20231003134350-e49dad63b306 h1:ko88+ZznniNJZbZPWAvHQU8SwKAdHngdDZ+pvVgB5ss= @@ -381,6 +389,7 @@ golang.org/x/oauth2 v0.0.0-20191202225959-858c2ad4c8b6/go.mod h1:gOpvHmFTYa4Iltr golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20210413134643-5e61552d6c78/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.6.0 h1:Lh8GPgSKBfWSwFvtuWOfeI3aAAnbXTSutYxJiOJFgIw= +golang.org/x/oauth2 v0.6.0/go.mod h1:ycmewcwgD4Rpr3eZJLSB4Kyyljb3qDh40vJ8STE5HKw= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -517,6 +526,7 @@ google.golang.org/appengine v1.6.1/go.mod h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww google.golang.org/appengine v1.6.5/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= google.golang.org/appengine v1.6.6/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c= +google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= google.golang.org/genproto v0.0.0-20190418145605-e7d98fc518a7/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= @@ -594,8 +604,11 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/errgo.v1 v1.0.0/go.mod h1:CxwszS/Xz1C49Ucd2i6Zil5UToP1EmyrFhKaMVbg1mk= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= +gopkg.in/guregu/null.v4 v4.0.0 h1:1Wm3S1WEA2I26Kq+6vcW+w0gcDo44YKYD7YIEJNHDjg= +gopkg.in/guregu/null.v4 v4.0.0/go.mod h1:YoQhUrADuG3i9WqesrCmpNRwm1ypAgSHYqoOcTu/JrI= gopkg.in/httprequest.v1 v1.2.1/go.mod h1:x2Otw96yda5+8+6ZeWwHIJTFkEHWP/qP8pJOzqEtWPM= gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22/go.mod h1:yeKp02qBN3iKW1OzL3MGk2IdtZzaj7SFntXj72NppTA= gopkg.in/retry.v1 v1.0.3/go.mod h1:FJkXmWiMaAo7xB+xhvDF59zhfjDWyzmyAxiT4dB688g= diff --git a/pkg/loop/internal/pb/generate.go b/pkg/loop/internal/pb/generate.go index 4c56e4efc..8df4efd6e 100644 --- a/pkg/loop/internal/pb/generate.go +++ b/pkg/loop/internal/pb/generate.go @@ -1,4 +1,5 @@ //go:generate protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative relayer.proto //go:generate protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative reporting.proto //go:generate protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative median.proto +//go:generate protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative pipeline_runner.proto package pb diff --git a/pkg/loop/internal/pb/pipeline_runner.pb.go b/pkg/loop/internal/pb/pipeline_runner.pb.go new file mode 100644 index 000000000..10670765a --- /dev/null +++ b/pkg/loop/internal/pb/pipeline_runner.pb.go @@ -0,0 +1,539 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.31.0 +// protoc v4.24.3 +// source: pipeline_runner.proto + +package pb + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + durationpb "google.golang.org/protobuf/types/known/durationpb" + structpb "google.golang.org/protobuf/types/known/structpb" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type Options struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + PersistErroredRuns bool `protobuf:"varint,1,opt,name=persist_errored_runs,json=persistErroredRuns,proto3" json:"persist_errored_runs,omitempty"` // Controls whether or not we will save the run async + MaxTaskDuration *durationpb.Duration `protobuf:"bytes,2,opt,name=max_task_duration,json=maxTaskDuration,proto3" json:"max_task_duration,omitempty"` + GasLimit uint32 `protobuf:"varint,3,opt,name=gas_limit,json=gasLimit,proto3" json:"gas_limit,omitempty"` + ForwardingAllowed bool `protobuf:"varint,4,opt,name=forwarding_allowed,json=forwardingAllowed,proto3" json:"forwarding_allowed,omitempty"` +} + +func (x *Options) Reset() { + *x = Options{} + if protoimpl.UnsafeEnabled { + mi := &file_pipeline_runner_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Options) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Options) ProtoMessage() {} + +func (x *Options) ProtoReflect() protoreflect.Message { + mi := &file_pipeline_runner_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Options.ProtoReflect.Descriptor instead. +func (*Options) Descriptor() ([]byte, []int) { + return file_pipeline_runner_proto_rawDescGZIP(), []int{0} +} + +func (x *Options) GetPersistErroredRuns() bool { + if x != nil { + return x.PersistErroredRuns + } + return false +} + +func (x *Options) GetMaxTaskDuration() *durationpb.Duration { + if x != nil { + return x.MaxTaskDuration + } + return nil +} + +func (x *Options) GetGasLimit() uint32 { + if x != nil { + return x.GasLimit + } + return 0 +} + +func (x *Options) GetForwardingAllowed() bool { + if x != nil { + return x.ForwardingAllowed + } + return false +} + +type Job struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id int32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` + Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"` + Type string `protobuf:"bytes,3,opt,name=type,proto3" json:"type,omitempty"` +} + +func (x *Job) Reset() { + *x = Job{} + if protoimpl.UnsafeEnabled { + mi := &file_pipeline_runner_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Job) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Job) ProtoMessage() {} + +func (x *Job) ProtoReflect() protoreflect.Message { + mi := &file_pipeline_runner_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Job.ProtoReflect.Descriptor instead. +func (*Job) Descriptor() ([]byte, []int) { + return file_pipeline_runner_proto_rawDescGZIP(), []int{1} +} + +func (x *Job) GetId() int32 { + if x != nil { + return x.Id + } + return 0 +} + +func (x *Job) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *Job) GetType() string { + if x != nil { + return x.Type + } + return "" +} + +type RunRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id int32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` + Source string `protobuf:"bytes,2,opt,name=source,proto3" json:"source,omitempty"` + Vars *structpb.Struct `protobuf:"bytes,3,opt,name=vars,proto3" json:"vars,omitempty"` + Options *Options `protobuf:"bytes,4,opt,name=options,proto3" json:"options,omitempty"` + Job *Job `protobuf:"bytes,5,opt,name=job,proto3" json:"job,omitempty"` +} + +func (x *RunRequest) Reset() { + *x = RunRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_pipeline_runner_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *RunRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RunRequest) ProtoMessage() {} + +func (x *RunRequest) ProtoReflect() protoreflect.Message { + mi := &file_pipeline_runner_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RunRequest.ProtoReflect.Descriptor instead. +func (*RunRequest) Descriptor() ([]byte, []int) { + return file_pipeline_runner_proto_rawDescGZIP(), []int{2} +} + +func (x *RunRequest) GetId() int32 { + if x != nil { + return x.Id + } + return 0 +} + +func (x *RunRequest) GetSource() string { + if x != nil { + return x.Source + } + return "" +} + +func (x *RunRequest) GetVars() *structpb.Struct { + if x != nil { + return x.Vars + } + return nil +} + +func (x *RunRequest) GetOptions() *Options { + if x != nil { + return x.Options + } + return nil +} + +func (x *RunRequest) GetJob() *Job { + if x != nil { + return x.Job + } + return nil +} + +type TaskResult struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + Type string `protobuf:"bytes,2,opt,name=type,proto3" json:"type,omitempty"` + Value *structpb.Value `protobuf:"bytes,3,opt,name=value,proto3" json:"value,omitempty"` + Index int32 `protobuf:"varint,4,opt,name=index,proto3" json:"index,omitempty"` +} + +func (x *TaskResult) Reset() { + *x = TaskResult{} + if protoimpl.UnsafeEnabled { + mi := &file_pipeline_runner_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *TaskResult) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*TaskResult) ProtoMessage() {} + +func (x *TaskResult) ProtoReflect() protoreflect.Message { + mi := &file_pipeline_runner_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use TaskResult.ProtoReflect.Descriptor instead. +func (*TaskResult) Descriptor() ([]byte, []int) { + return file_pipeline_runner_proto_rawDescGZIP(), []int{3} +} + +func (x *TaskResult) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +func (x *TaskResult) GetType() string { + if x != nil { + return x.Type + } + return "" +} + +func (x *TaskResult) GetValue() *structpb.Value { + if x != nil { + return x.Value + } + return nil +} + +func (x *TaskResult) GetIndex() int32 { + if x != nil { + return x.Index + } + return 0 +} + +type RunResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Results []*TaskResult `protobuf:"bytes,1,rep,name=results,proto3" json:"results,omitempty"` +} + +func (x *RunResponse) Reset() { + *x = RunResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_pipeline_runner_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *RunResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RunResponse) ProtoMessage() {} + +func (x *RunResponse) ProtoReflect() protoreflect.Message { + mi := &file_pipeline_runner_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RunResponse.ProtoReflect.Descriptor instead. +func (*RunResponse) Descriptor() ([]byte, []int) { + return file_pipeline_runner_proto_rawDescGZIP(), []int{4} +} + +func (x *RunResponse) GetResults() []*TaskResult { + if x != nil { + return x.Results + } + return nil +} + +var File_pipeline_runner_proto protoreflect.FileDescriptor + +var file_pipeline_runner_proto_rawDesc = []byte{ + 0x0a, 0x15, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x5f, 0x72, 0x75, 0x6e, 0x6e, 0x65, + 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x04, 0x6c, 0x6f, 0x6f, 0x70, 0x1a, 0x1e, 0x67, + 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x64, + 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1c, 0x67, + 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x73, + 0x74, 0x72, 0x75, 0x63, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xce, 0x01, 0x0a, 0x07, + 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x30, 0x0a, 0x14, 0x70, 0x65, 0x72, 0x73, 0x69, + 0x73, 0x74, 0x5f, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x65, 0x64, 0x5f, 0x72, 0x75, 0x6e, 0x73, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x12, 0x70, 0x65, 0x72, 0x73, 0x69, 0x73, 0x74, 0x45, 0x72, + 0x72, 0x6f, 0x72, 0x65, 0x64, 0x52, 0x75, 0x6e, 0x73, 0x12, 0x45, 0x0a, 0x11, 0x6d, 0x61, 0x78, + 0x5f, 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x44, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, + 0x0f, 0x6d, 0x61, 0x78, 0x54, 0x61, 0x73, 0x6b, 0x44, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, + 0x12, 0x1b, 0x0a, 0x09, 0x67, 0x61, 0x73, 0x5f, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x0d, 0x52, 0x08, 0x67, 0x61, 0x73, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x12, 0x2d, 0x0a, + 0x12, 0x66, 0x6f, 0x72, 0x77, 0x61, 0x72, 0x64, 0x69, 0x6e, 0x67, 0x5f, 0x61, 0x6c, 0x6c, 0x6f, + 0x77, 0x65, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x08, 0x52, 0x11, 0x66, 0x6f, 0x72, 0x77, 0x61, + 0x72, 0x64, 0x69, 0x6e, 0x67, 0x41, 0x6c, 0x6c, 0x6f, 0x77, 0x65, 0x64, 0x22, 0x3d, 0x0a, 0x03, + 0x4a, 0x6f, 0x62, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, + 0x02, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, + 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x22, 0xa7, 0x01, 0x0a, 0x0a, + 0x52, 0x75, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x02, 0x69, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x6f, + 0x75, 0x72, 0x63, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x6f, 0x75, 0x72, + 0x63, 0x65, 0x12, 0x2b, 0x0a, 0x04, 0x76, 0x61, 0x72, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x17, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, + 0x75, 0x66, 0x2e, 0x53, 0x74, 0x72, 0x75, 0x63, 0x74, 0x52, 0x04, 0x76, 0x61, 0x72, 0x73, 0x12, + 0x27, 0x0a, 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x0d, 0x2e, 0x6c, 0x6f, 0x6f, 0x70, 0x2e, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, + 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x1b, 0x0a, 0x03, 0x6a, 0x6f, 0x62, 0x18, + 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x09, 0x2e, 0x6c, 0x6f, 0x6f, 0x70, 0x2e, 0x4a, 0x6f, 0x62, + 0x52, 0x03, 0x6a, 0x6f, 0x62, 0x22, 0x74, 0x0a, 0x0a, 0x54, 0x61, 0x73, 0x6b, 0x52, 0x65, 0x73, + 0x75, 0x6c, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x02, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x2c, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, + 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x05, + 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x18, 0x04, + 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x22, 0x39, 0x0a, 0x0b, 0x52, + 0x75, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2a, 0x0a, 0x07, 0x72, 0x65, + 0x73, 0x75, 0x6c, 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x6c, 0x6f, + 0x6f, 0x70, 0x2e, 0x54, 0x61, 0x73, 0x6b, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x52, 0x07, 0x72, + 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x32, 0x4a, 0x0a, 0x15, 0x50, 0x69, 0x70, 0x65, 0x6c, 0x69, + 0x6e, 0x65, 0x52, 0x75, 0x6e, 0x6e, 0x65, 0x72, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, + 0x31, 0x0a, 0x0a, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x52, 0x75, 0x6e, 0x12, 0x10, 0x2e, + 0x6c, 0x6f, 0x6f, 0x70, 0x2e, 0x52, 0x75, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, + 0x11, 0x2e, 0x6c, 0x6f, 0x6f, 0x70, 0x2e, 0x52, 0x75, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x42, 0x42, 0x5a, 0x40, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, + 0x2f, 0x73, 0x6d, 0x61, 0x72, 0x74, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x61, 0x63, 0x74, 0x6b, 0x69, + 0x74, 0x2f, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x6c, 0x69, 0x6e, 0x6b, 0x2d, 0x72, 0x65, 0x6c, 0x61, + 0x79, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x6c, 0x6f, 0x6f, 0x70, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, + 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_pipeline_runner_proto_rawDescOnce sync.Once + file_pipeline_runner_proto_rawDescData = file_pipeline_runner_proto_rawDesc +) + +func file_pipeline_runner_proto_rawDescGZIP() []byte { + file_pipeline_runner_proto_rawDescOnce.Do(func() { + file_pipeline_runner_proto_rawDescData = protoimpl.X.CompressGZIP(file_pipeline_runner_proto_rawDescData) + }) + return file_pipeline_runner_proto_rawDescData +} + +var file_pipeline_runner_proto_msgTypes = make([]protoimpl.MessageInfo, 5) +var file_pipeline_runner_proto_goTypes = []interface{}{ + (*Options)(nil), // 0: loop.Options + (*Job)(nil), // 1: loop.Job + (*RunRequest)(nil), // 2: loop.RunRequest + (*TaskResult)(nil), // 3: loop.TaskResult + (*RunResponse)(nil), // 4: loop.RunResponse + (*durationpb.Duration)(nil), // 5: google.protobuf.Duration + (*structpb.Struct)(nil), // 6: google.protobuf.Struct + (*structpb.Value)(nil), // 7: google.protobuf.Value +} +var file_pipeline_runner_proto_depIdxs = []int32{ + 5, // 0: loop.Options.max_task_duration:type_name -> google.protobuf.Duration + 6, // 1: loop.RunRequest.vars:type_name -> google.protobuf.Struct + 0, // 2: loop.RunRequest.options:type_name -> loop.Options + 1, // 3: loop.RunRequest.job:type_name -> loop.Job + 7, // 4: loop.TaskResult.value:type_name -> google.protobuf.Value + 3, // 5: loop.RunResponse.results:type_name -> loop.TaskResult + 2, // 6: loop.PipelineRunnerService.ExecuteRun:input_type -> loop.RunRequest + 4, // 7: loop.PipelineRunnerService.ExecuteRun:output_type -> loop.RunResponse + 7, // [7:8] is the sub-list for method output_type + 6, // [6:7] is the sub-list for method input_type + 6, // [6:6] is the sub-list for extension type_name + 6, // [6:6] is the sub-list for extension extendee + 0, // [0:6] is the sub-list for field type_name +} + +func init() { file_pipeline_runner_proto_init() } +func file_pipeline_runner_proto_init() { + if File_pipeline_runner_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_pipeline_runner_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Options); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pipeline_runner_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Job); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pipeline_runner_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*RunRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pipeline_runner_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*TaskResult); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pipeline_runner_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*RunResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_pipeline_runner_proto_rawDesc, + NumEnums: 0, + NumMessages: 5, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_pipeline_runner_proto_goTypes, + DependencyIndexes: file_pipeline_runner_proto_depIdxs, + MessageInfos: file_pipeline_runner_proto_msgTypes, + }.Build() + File_pipeline_runner_proto = out.File + file_pipeline_runner_proto_rawDesc = nil + file_pipeline_runner_proto_goTypes = nil + file_pipeline_runner_proto_depIdxs = nil +} diff --git a/pkg/loop/internal/pb/pipeline_runner.proto b/pkg/loop/internal/pb/pipeline_runner.proto new file mode 100644 index 000000000..355cee8f7 --- /dev/null +++ b/pkg/loop/internal/pb/pipeline_runner.proto @@ -0,0 +1,44 @@ +syntax = "proto3"; + +option go_package = "github.com/smartcontractkit/chainlink-relay/pkg/loop/internal/pb"; + +package loop; + +import "google/protobuf/duration.proto"; +import "google/protobuf/struct.proto"; + +message Options { + bool persist_errored_runs = 1; // Controls whether or not we will save the run async + google.protobuf.Duration max_task_duration = 2; + uint32 gas_limit = 3; + bool forwarding_allowed = 4; +} + +message Job { + int32 id = 1; + string name = 2; + string type = 3; +} + +message RunRequest { + int32 id = 1; + string source = 2; + google.protobuf.Struct vars = 3; + Options options = 4; + Job job = 5; +} + +message TaskResult { + string id = 1; + string type = 2; + google.protobuf.Value value = 3; + int32 index = 4; +} + +message RunResponse { + repeated TaskResult results = 1; +} + +service PipelineRunnerService { + rpc ExecuteRun(RunRequest) returns (RunResponse); +} \ No newline at end of file diff --git a/pkg/loop/internal/pb/pipeline_runner_grpc.pb.go b/pkg/loop/internal/pb/pipeline_runner_grpc.pb.go new file mode 100644 index 000000000..021348964 --- /dev/null +++ b/pkg/loop/internal/pb/pipeline_runner_grpc.pb.go @@ -0,0 +1,109 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.3.0 +// - protoc v4.24.3 +// source: pipeline_runner.proto + +package pb + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +const ( + PipelineRunnerService_ExecuteRun_FullMethodName = "/loop.PipelineRunnerService/ExecuteRun" +) + +// PipelineRunnerServiceClient is the client API for PipelineRunnerService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type PipelineRunnerServiceClient interface { + ExecuteRun(ctx context.Context, in *RunRequest, opts ...grpc.CallOption) (*RunResponse, error) +} + +type pipelineRunnerServiceClient struct { + cc grpc.ClientConnInterface +} + +func NewPipelineRunnerServiceClient(cc grpc.ClientConnInterface) PipelineRunnerServiceClient { + return &pipelineRunnerServiceClient{cc} +} + +func (c *pipelineRunnerServiceClient) ExecuteRun(ctx context.Context, in *RunRequest, opts ...grpc.CallOption) (*RunResponse, error) { + out := new(RunResponse) + err := c.cc.Invoke(ctx, PipelineRunnerService_ExecuteRun_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// PipelineRunnerServiceServer is the server API for PipelineRunnerService service. +// All implementations must embed UnimplementedPipelineRunnerServiceServer +// for forward compatibility +type PipelineRunnerServiceServer interface { + ExecuteRun(context.Context, *RunRequest) (*RunResponse, error) + mustEmbedUnimplementedPipelineRunnerServiceServer() +} + +// UnimplementedPipelineRunnerServiceServer must be embedded to have forward compatible implementations. +type UnimplementedPipelineRunnerServiceServer struct { +} + +func (UnimplementedPipelineRunnerServiceServer) ExecuteRun(context.Context, *RunRequest) (*RunResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method ExecuteRun not implemented") +} +func (UnimplementedPipelineRunnerServiceServer) mustEmbedUnimplementedPipelineRunnerServiceServer() {} + +// UnsafePipelineRunnerServiceServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to PipelineRunnerServiceServer will +// result in compilation errors. +type UnsafePipelineRunnerServiceServer interface { + mustEmbedUnimplementedPipelineRunnerServiceServer() +} + +func RegisterPipelineRunnerServiceServer(s grpc.ServiceRegistrar, srv PipelineRunnerServiceServer) { + s.RegisterService(&PipelineRunnerService_ServiceDesc, srv) +} + +func _PipelineRunnerService_ExecuteRun_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(RunRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(PipelineRunnerServiceServer).ExecuteRun(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: PipelineRunnerService_ExecuteRun_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(PipelineRunnerServiceServer).ExecuteRun(ctx, req.(*RunRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// PipelineRunnerService_ServiceDesc is the grpc.ServiceDesc for PipelineRunnerService service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var PipelineRunnerService_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "loop.PipelineRunnerService", + HandlerType: (*PipelineRunnerServiceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "ExecuteRun", + Handler: _PipelineRunnerService_ExecuteRun_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "pipeline_runner.proto", +} diff --git a/pkg/loop/internal/pipeline_runner.go b/pkg/loop/internal/pipeline_runner.go new file mode 100644 index 000000000..294847405 --- /dev/null +++ b/pkg/loop/internal/pipeline_runner.go @@ -0,0 +1,125 @@ +package internal + +import ( + "context" + "time" + + uuid3 "github.com/google/uuid" + uuid "github.com/satori/go.uuid" + "google.golang.org/protobuf/types/known/durationpb" + "google.golang.org/protobuf/types/known/structpb" + "gopkg.in/guregu/null.v4" + + "github.com/smartcontractkit/chainlink-relay/pkg/logger" + "github.com/smartcontractkit/chainlink-relay/pkg/loop/internal/pb" + "github.com/smartcontractkit/chainlink-relay/pkg/types" +) + +var _ PipelineRunner = (*pipelineRunnerServiceClient)(nil) + +type pipelineRunnerServiceClient struct { + *brokerExt + grpc pb.PipelineRunnerServiceClient +} + +func (p pipelineRunnerServiceClient) ExecuteRun(ctx context.Context, spec types.Spec, vars types.Vars, l logger.Logger) (*types.Run, types.TaskRunResults, error) { + varsStruct, err := structpb.NewStruct(vars.Vars) + if err != nil { + return nil, nil, err + } + + rr := pb.RunRequest{ + Id: spec.ID, + Source: spec.DotDagSource, + Vars: varsStruct, + Options: &pb.Options{ + PersistErroredRuns: false, + MaxTaskDuration: durationpb.New(spec.MaxTaskDuration), + GasLimit: *spec.GasLimit, + ForwardingAllowed: spec.ForwardingAllowed, + }, + Job: &pb.Job{ + Id: spec.JobID, + Name: spec.JobName, + Type: spec.JobType, + }, + } + + executeRunResult, err := p.grpc.ExecuteRun(ctx, &rr) + if err != nil { + return nil, nil, err + } + + trrs := make([]types.TaskRunResult, len(executeRunResult.Results)) + + for i, trr := range executeRunResult.Results { + id, err := uuid.FromString(trr.Id) + if err != nil { + return nil, nil, err + } + + trrs[i] = types.TaskRunResult{ + ID: uuid3.UUID(id), + Task: nil, + TaskRun: types.TaskRun{}, + Result: types.Result{}, + Attempts: 0, + CreatedAt: time.Time{}, + FinishedAt: null.Time{}, + RunInfo: types.RunInfo{}, + } + } + + return nil, trrs, nil +} + +var _ pb.PipelineRunnerServiceServer = (*pipelineRunnerServiceServer)(nil) + +type pipelineRunnerServiceServer struct { + pb.UnimplementedPipelineRunnerServiceServer + *brokerExt + + impl PipelineRunner +} + +func (p *pipelineRunnerServiceServer) ExecuteRun(ctx context.Context, rr *pb.RunRequest) (*pb.RunResponse, error) { + + spec := types.Spec{ + ID: rr.Id, + DotDagSource: rr.Source, + CreatedAt: time.Now(), + MaxTaskDuration: rr.Options.MaxTaskDuration.AsDuration(), + GasLimit: &rr.Options.GasLimit, + ForwardingAllowed: rr.Options.ForwardingAllowed, + JobID: rr.Job.Id, + JobName: rr.Job.Name, + JobType: rr.Job.Type, + } + vars := types.Vars{Vars: rr.Vars.AsMap()} + + _, trrs, err := p.impl.ExecuteRun(ctx, spec, vars, nil) // Need logger + if err != nil { + return nil, err + } + + taskResults := make([]*pb.TaskResult, len(trrs)) + + for i, trr := range trrs { + v, err := structpb.NewValue(trr.Result.Value) + if err != nil { + return nil, err + } + + taskResults[i] = &pb.TaskResult{ + Id: trr.ID.String(), + Type: trr.TaskRun.Type, + Value: v, + Index: 0, // TODO: figure out what this is, + } + } + + return &pb.RunResponse{ + Results: taskResults, + }, nil + +} diff --git a/pkg/loop/internal/types.go b/pkg/loop/internal/types.go index f47648472..3f53b415f 100644 --- a/pkg/loop/internal/types.go +++ b/pkg/loop/internal/types.go @@ -3,6 +3,7 @@ package internal import ( "context" + "github.com/smartcontractkit/chainlink-relay/pkg/logger" "github.com/smartcontractkit/chainlink-relay/pkg/types" ) @@ -29,3 +30,7 @@ type Relayer interface { NewConfigProvider(context.Context, types.RelayArgs) (types.ConfigProvider, error) NewPluginProvider(context.Context, types.RelayArgs, types.PluginArgs) (types.PluginProvider, error) } + +type PipelineRunner interface { + ExecuteRun(ctx context.Context, spec types.Spec, vars types.Vars, l logger.Logger) (run *types.Run, trrs types.TaskRunResults, err error) +} diff --git a/pkg/types/pipeline_runner.go b/pkg/types/pipeline_runner.go new file mode 100644 index 000000000..032878d53 --- /dev/null +++ b/pkg/types/pipeline_runner.go @@ -0,0 +1,85 @@ +package types + +import ( + "time" + + "github.com/google/uuid" + "gopkg.in/guregu/null.v4" +) + +type Vars struct { + Vars map[string]interface{} +} +type TaskRunResults []TaskRunResult +type TaskRunResult struct { + ID uuid.UUID + Task Task + TaskRun TaskRun + Result Result + Attempts uint + CreatedAt time.Time + FinishedAt null.Time + RunInfo RunInfo +} + +type Task interface{} +type TaskRun struct { + Type string + CreatedAt time.Time + FinishedAt time.Time + Output string + Error interface{} + DotID string +} + +type Result struct { + Value interface{} + Error error +} +type RunInfo struct { + IsRetryable bool + IsPending bool +} + +type Spec struct { + ID int32 + DotDagSource string + CreatedAt time.Time + MaxTaskDuration time.Duration + GasLimit *uint32 + ForwardingAllowed bool + + JobID int32 + JobName string + JobType string +} + +type SpecData struct { + ID string +} + +type Run struct { + ID int64 + PipelineSpecID int32 + PipelineSpec Spec + Meta JSONSerializable + AllErrors RunErrors + FatalErrors RunErrors + Inputs JSONSerializable + Outputs JSONSerializable + CreatedAt time.Time + FinishedAt null.Time + PipelineTaskRuns []TaskRun + State RunStatus + + Pending bool + FailSilently bool +} + +type JSONSerializable struct { + Val interface{} + Valid bool +} + +type RunErrors []null.String +type RunStatus string