Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce job spec flag for custom reverted pipeline #11529

Merged
merged 4 commits into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions core/services/job/job_orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,9 @@ func TestORM_CreateJob_VRFV2(t *testing.T) {
var batchFulfillmentEnabled bool
require.NoError(t, db.Get(&batchFulfillmentEnabled, `SELECT batch_fulfillment_enabled FROM vrf_specs LIMIT 1`))
require.False(t, batchFulfillmentEnabled)
var customRevertsPipelineEnabled bool
require.NoError(t, db.Get(&customRevertsPipelineEnabled, `SELECT custom_reverts_pipeline_enabled FROM vrf_specs LIMIT 1`))
require.False(t, customRevertsPipelineEnabled)
var batchFulfillmentGasMultiplier float64
require.NoError(t, db.Get(&batchFulfillmentGasMultiplier, `SELECT batch_fulfillment_gas_multiplier FROM vrf_specs LIMIT 1`))
require.Equal(t, float64(1.0), batchFulfillmentGasMultiplier)
Expand Down Expand Up @@ -514,13 +517,14 @@ func TestORM_CreateJob_VRFV2Plus(t *testing.T) {
fromAddresses := []string{cltest.NewEIP55Address().String(), cltest.NewEIP55Address().String()}
jb, err := vrfcommon.ValidatedVRFSpec(testspecs.GenerateVRFSpec(
testspecs.VRFSpecParams{
VRFVersion: vrfcommon.V2Plus,
RequestedConfsDelay: 10,
FromAddresses: fromAddresses,
ChunkSize: 25,
BackoffInitialDelay: time.Minute,
BackoffMaxDelay: time.Hour,
GasLanePrice: assets.GWei(100),
VRFVersion: vrfcommon.V2Plus,
RequestedConfsDelay: 10,
FromAddresses: fromAddresses,
ChunkSize: 25,
BackoffInitialDelay: time.Minute,
BackoffMaxDelay: time.Hour,
GasLanePrice: assets.GWei(100),
CustomRevertsPipelineEnabled: true,
}).
Toml())
require.NoError(t, err)
Expand All @@ -534,6 +538,9 @@ func TestORM_CreateJob_VRFV2Plus(t *testing.T) {
var batchFulfillmentEnabled bool
require.NoError(t, db.Get(&batchFulfillmentEnabled, `SELECT batch_fulfillment_enabled FROM vrf_specs LIMIT 1`))
require.False(t, batchFulfillmentEnabled)
var customRevertsPipelineEnabled bool
require.NoError(t, db.Get(&customRevertsPipelineEnabled, `SELECT custom_reverts_pipeline_enabled FROM vrf_specs LIMIT 1`))
require.True(t, customRevertsPipelineEnabled)
var batchFulfillmentGasMultiplier float64
require.NoError(t, db.Get(&batchFulfillmentGasMultiplier, `SELECT batch_fulfillment_gas_multiplier FROM vrf_specs LIMIT 1`))
require.Equal(t, float64(1.0), batchFulfillmentGasMultiplier)
Expand Down
3 changes: 3 additions & 0 deletions core/services/job/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,9 @@ type VRFSpec struct {
// for fulfilling requests. If set to true, batchCoordinatorAddress must be set in
// the job spec.
BatchFulfillmentEnabled bool `toml:"batchFulfillmentEnabled"`
// CustomRevertsPipelineEnabled indicates to the vrf job to run the
// custom reverted txns pipeline along with VRF listener
CustomRevertsPipelineEnabled bool `toml:"customRevertsPipelineEnabled"`
// BatchFulfillmentGasMultiplier is used to determine the final gas estimate for the batch
// fulfillment.
BatchFulfillmentGasMultiplier tomlutils.Float64 `toml:"batchFulfillmentGasMultiplier"`
Expand Down
4 changes: 2 additions & 2 deletions core/services/job/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,14 +326,14 @@ func (o *orm) CreateJob(jb *Job, qopts ...pg.QOpt) error {
evm_chain_id, from_addresses, poll_period, requested_confs_delay,
request_timeout, chunk_size, batch_coordinator_address, batch_fulfillment_enabled,
batch_fulfillment_gas_multiplier, backoff_initial_delay, backoff_max_delay, gas_lane_price,
vrf_owner_address,
vrf_owner_address, custom_reverts_pipeline_enabled,
created_at, updated_at)
VALUES (
:coordinator_address, :public_key, :min_incoming_confirmations,
:evm_chain_id, :from_addresses, :poll_period, :requested_confs_delay,
:request_timeout, :chunk_size, :batch_coordinator_address, :batch_fulfillment_enabled,
:batch_fulfillment_gas_multiplier, :backoff_initial_delay, :backoff_max_delay, :gas_lane_price,
:vrf_owner_address,
:vrf_owner_address, :custom_reverts_pipeline_enabled,
NOW(), NOW())
RETURNING id;`

Expand Down
1 change: 1 addition & 0 deletions core/services/job/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func ValidateSpec(ts string) (Type, error) {
if jb.Pipeline.RequiresPreInsert() && !jb.Type.SupportsAsync() {
return "", errors.Errorf("async=true tasks are not supported for %v", jb.Type)
}
// spec.CustomRevertsPipelineEnabled == false, default is custom reverted txns pipeline disabled

if strings.Contains(ts, "<{}>") {
return "", errors.Errorf("'<{}>' syntax is not supported. Please use \"{}\" instead")
Expand Down
3 changes: 3 additions & 0 deletions core/services/vrf/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,9 @@ func (d *Delegate) ServicesForSpec(jb job.Job) ([]job.ServiceCtx, error) {
if vrfOwner != nil {
return nil, errors.New("VRF Owner is not supported for VRF V2 Plus")
}
if jb.VRFSpec.CustomRevertsPipelineEnabled {
return nil, errors.New("Custom Reverted Txns Pipeline is not supported for VRF V2 Plus")
}

// Get the LINKNATIVEFEED address with retries
// This is needed because the RPC endpoint may be down so we need to
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- +goose Up
ALTER TABLE vrf_specs ADD COLUMN custom_reverts_pipeline_enabled boolean DEFAULT FALSE NOT NULL;

-- +goose Down
ALTER TABLE vrf_specs DROP COLUMN custom_reverts_pipeline_enabled;
3 changes: 3 additions & 0 deletions core/testdata/testspecs/v2_specs.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ type VRFSpecParams struct {
BatchCoordinatorAddress string
VRFOwnerAddress string
BatchFulfillmentEnabled bool
CustomRevertsPipelineEnabled bool
BatchFulfillmentGasMultiplier float64
MinIncomingConfirmations int
FromAddresses []string
Expand Down Expand Up @@ -403,6 +404,7 @@ evmChainID = "%s"
batchCoordinatorAddress = "%s"
batchFulfillmentEnabled = %v
batchFulfillmentGasMultiplier = %s
customRevertsPipelineEnabled = %v
minIncomingConfirmations = %d
requestedConfsDelay = %d
requestTimeout = "%s"
Expand All @@ -419,6 +421,7 @@ observationSource = """
toml := fmt.Sprintf(template,
jobID, name, coordinatorAddress, params.EVMChainID, batchCoordinatorAddress,
params.BatchFulfillmentEnabled, strconv.FormatFloat(batchFulfillmentGasMultiplier, 'f', 2, 64),
params.CustomRevertsPipelineEnabled,
confirmations, params.RequestedConfsDelay, requestTimeout.String(), publicKey, chunkSize,
params.BackoffInitialDelay.String(), params.BackoffMaxDelay.String(), gasLanePrice.String(),
pollPeriod.String(), observationSource)
Expand Down
38 changes: 22 additions & 16 deletions core/web/presenters/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ func NewCronSpec(spec *job.CronSpec) *CronSpec {
type VRFSpec struct {
BatchCoordinatorAddress *ethkey.EIP55Address `json:"batchCoordinatorAddress"`
BatchFulfillmentEnabled bool `json:"batchFulfillmentEnabled"`
CustomRevertsPipelineEnabled *bool `json:"customRevertsPipelineEnabled,omitempty"`
BatchFulfillmentGasMultiplier float64 `json:"batchFulfillmentGasMultiplier"`
CoordinatorAddress ethkey.EIP55Address `json:"coordinatorAddress"`
PublicKey secp256k1.PublicKey `json:"publicKey"`
Expand All @@ -281,26 +282,31 @@ type VRFSpec struct {
BackoffInitialDelay models.Duration `json:"backoffInitialDelay"`
BackoffMaxDelay models.Duration `json:"backoffMaxDelay"`
GasLanePrice *assets.Wei `json:"gasLanePrice"`
VRFOwnerAddress *ethkey.EIP55Address `json:"vrfOwnerAddress"`
RequestedConfsDelay int64 `json:"requestedConfsDelay"`
VRFOwnerAddress *ethkey.EIP55Address `json:"vrfOwnerAddress,omitempty"`
}

func NewVRFSpec(spec *job.VRFSpec) *VRFSpec {
return &VRFSpec{
BatchCoordinatorAddress: spec.BatchCoordinatorAddress,
BatchFulfillmentEnabled: spec.BatchFulfillmentEnabled,
CoordinatorAddress: spec.CoordinatorAddress,
PublicKey: spec.PublicKey,
FromAddresses: spec.FromAddresses,
PollPeriod: models.MustMakeDuration(spec.PollPeriod),
MinIncomingConfirmations: spec.MinIncomingConfirmations,
CreatedAt: spec.CreatedAt,
UpdatedAt: spec.UpdatedAt,
EVMChainID: spec.EVMChainID,
ChunkSize: spec.ChunkSize,
RequestTimeout: models.MustMakeDuration(spec.RequestTimeout),
BackoffInitialDelay: models.MustMakeDuration(spec.BackoffInitialDelay),
BackoffMaxDelay: models.MustMakeDuration(spec.BackoffMaxDelay),
GasLanePrice: spec.GasLanePrice,
BatchCoordinatorAddress: spec.BatchCoordinatorAddress,
BatchFulfillmentEnabled: spec.BatchFulfillmentEnabled,
BatchFulfillmentGasMultiplier: float64(spec.BatchFulfillmentGasMultiplier),
CustomRevertsPipelineEnabled: &spec.CustomRevertsPipelineEnabled,
CoordinatorAddress: spec.CoordinatorAddress,
PublicKey: spec.PublicKey,
FromAddresses: spec.FromAddresses,
PollPeriod: models.MustMakeDuration(spec.PollPeriod),
MinIncomingConfirmations: spec.MinIncomingConfirmations,
CreatedAt: spec.CreatedAt,
UpdatedAt: spec.UpdatedAt,
EVMChainID: spec.EVMChainID,
ChunkSize: spec.ChunkSize,
RequestTimeout: models.MustMakeDuration(spec.RequestTimeout),
BackoffInitialDelay: models.MustMakeDuration(spec.BackoffInitialDelay),
BackoffMaxDelay: models.MustMakeDuration(spec.BackoffMaxDelay),
GasLanePrice: spec.GasLanePrice,
RequestedConfsDelay: spec.RequestedConfsDelay,
VRFOwnerAddress: spec.VRFOwnerAddress,
}
}

Expand Down
87 changes: 87 additions & 0 deletions core/web/presenters/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@ import (
"gopkg.in/guregu/null.v4"

"github.com/smartcontractkit/chainlink-common/pkg/assets"
evmassets "github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"
clnull "github.com/smartcontractkit/chainlink/v2/core/null"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ethkey"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
"github.com/smartcontractkit/chainlink/v2/core/services/signatures/secp256k1"
"github.com/smartcontractkit/chainlink/v2/core/store/models"
"github.com/smartcontractkit/chainlink/v2/core/web/presenters"
)
Expand Down Expand Up @@ -58,6 +60,7 @@ func TestJob(t *testing.T) {
trustedBlockhashStoreBatchSize := int32(20)

var specGasLimit uint32 = 1000
vrfPubKey, _ := secp256k1.NewPublicKeyFromHex("0xede539e216e3a50e69d1c68aa9cc472085876c4002f6e1e6afee0ea63b50a78b00")

testCases := []struct {
name string
Expand Down Expand Up @@ -469,6 +472,90 @@ func TestJob(t *testing.T) {
}
}`,
},
{
name: "vrf job spec",
job: job.Job{
ID: 1,
Name: null.StringFrom("vrf_test"),
Type: job.VRF,
SchemaVersion: 1,
ExternalJobID: uuid.MustParse("0eec7e1d-d0d2-476c-a1a8-72dfb6633f47"),
VRFSpec: &job.VRFSpec{
BatchCoordinatorAddress: &contractAddress,
BatchFulfillmentEnabled: true,
CustomRevertsPipelineEnabled: true,
MinIncomingConfirmations: 1,
CoordinatorAddress: contractAddress,
CreatedAt: timestamp,
UpdatedAt: timestamp,
EVMChainID: evmChainID,
FromAddresses: []ethkey.EIP55Address{fromAddress},
PublicKey: vrfPubKey,
RequestedConfsDelay: 10,
ChunkSize: 25,
BatchFulfillmentGasMultiplier: 1,
GasLanePrice: evmassets.GWei(200),
VRFOwnerAddress: nil,
},
PipelineSpec: &pipeline.Spec{
ID: 1,
DotDagSource: "",
},
},
want: fmt.Sprintf(`
{
"data": {
"type": "jobs",
"id": "1",
"attributes": {
"name": "vrf_test",
"type": "vrf",
"schemaVersion": 1,
"maxTaskDuration": "0s",
"externalJobID": "0eec7e1d-d0d2-476c-a1a8-72dfb6633f47",
"directRequestSpec": null,
"fluxMonitorSpec": null,
"gasLimit": null,
"forwardingAllowed": false,
"cronSpec": null,
"offChainReportingOracleSpec": null,
"offChainReporting2OracleSpec": null,
"keeperSpec": null,
"vrfSpec": {
"batchCoordinatorAddress": "%s",
"batchFulfillmentEnabled": true,
"customRevertsPipelineEnabled": true,
"confirmations": 1,
"coordinatorAddress": "%s",
"createdAt": "2000-01-01T00:00:00Z",
"updatedAt": "2000-01-01T00:00:00Z",
"evmChainID": "42",
"fromAddresses": ["%s"],
"pollPeriod": "0s",
"publicKey": "%s",
"requestedConfsDelay": 10,
"requestTimeout": "0s",
"chunkSize": 25,
"batchFulfillmentGasMultiplier": 1,
"backoffInitialDelay": "0s",
"backoffMaxDelay": "0s",
"gasLanePrice": "200 gwei"
},
"webhookSpec": null,
"blockhashStoreSpec": null,
"blockHeaderFeederSpec": null,
"bootstrapSpec": null,
"pipelineSpec": {
"id": 1,
"jobID": 0,
"dotDagSource": ""
},
"gatewaySpec": null,
"errors": []
}
}
}`, contractAddress, contractAddress, fromAddress, vrfPubKey.String()),
},
{
name: "blockhash store spec",
job: job.Job{
Expand Down
5 changes: 5 additions & 0 deletions core/web/resolver/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,11 @@ func (r *VRFSpecResolver) BatchFulfillmentGasMultiplier() float64 {
return float64(r.spec.BatchFulfillmentGasMultiplier)
}

// CustomRevertsPipelineEnabled resolves the spec's custom reverts pipeline enabled flag.
func (r *VRFSpecResolver) CustomRevertsPipelineEnabled() *bool {
return &r.spec.CustomRevertsPipelineEnabled
}

// ChunkSize resolves the spec's chunk size.
func (r *VRFSpecResolver) ChunkSize() int32 {
return int32(r.spec.ChunkSize)
Expand Down
3 changes: 3 additions & 0 deletions core/web/resolver/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,7 @@ func TestResolver_VRFSpec(t *testing.T) {
VRFSpec: &job.VRFSpec{
BatchCoordinatorAddress: &batchCoordinatorAddress,
BatchFulfillmentEnabled: true,
CustomRevertsPipelineEnabled: true,
MinIncomingConfirmations: 1,
CoordinatorAddress: coordinatorAddress,
CreatedAt: f.Timestamp(),
Expand Down Expand Up @@ -617,6 +618,7 @@ func TestResolver_VRFSpec(t *testing.T) {
batchCoordinatorAddress
batchFulfillmentEnabled
batchFulfillmentGasMultiplier
customRevertsPipelineEnabled
chunkSize
backoffInitialDelay
backoffMaxDelay
Expand Down Expand Up @@ -644,6 +646,7 @@ func TestResolver_VRFSpec(t *testing.T) {
"batchCoordinatorAddress": "0x0ad9FE7a58216242a8475ca92F222b0640E26B63",
"batchFulfillmentEnabled": true,
"batchFulfillmentGasMultiplier": 1,
"customRevertsPipelineEnabled": true,
"chunkSize": 25,
"backoffInitialDelay": "1m0s",
"backoffMaxDelay": "1h0m0s",
Expand Down
1 change: 1 addition & 0 deletions core/web/schema/type/spec.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ type VRFSpec {
batchCoordinatorAddress: String
batchFulfillmentEnabled: Boolean!
batchFulfillmentGasMultiplier: Float!
customRevertsPipelineEnabled: Boolean
chunkSize: Int!
backoffInitialDelay: String!
backoffMaxDelay: String!
Expand Down
Loading