From 79d8053f2fc6684ec888508c302a52bc05fb3450 Mon Sep 17 00:00:00 2001 From: Simon Dudley Date: Fri, 8 Sep 2023 13:38:43 +1000 Subject: [PATCH] Add high watermark to slashing database (#896) * New metadata columns: high_watermark_epoch and high_watermark_slot * CRUD operations in MetadataDao - find - update (assumes GVR metadata is already inserted) - delete * Add PL/pgSQL trigger as constraints for checking high_watermarks are greater than low_watermarks * Add PL/pgSQL trigger as constraints for checking low_watermarks are less than or equal to high_watermarks --- .../PruningIntegrationTest.java | 26 +++ .../dao/DatabaseVersionDao.java | 2 +- .../slashingprotection/dao/HighWatermark.java | 60 +++++++ .../slashingprotection/dao/MetadataDao.java | 28 +++ .../V00012__add_highwatermark_metadata.sql | 63 +++++++ .../dao/LowWatermarkDaoTest.java | 93 ++++++++++ .../dao/MetadataDaoTest.java | 163 ++++++++++++++++++ 7 files changed, 434 insertions(+), 1 deletion(-) create mode 100644 slashing-protection/src/main/java/tech/pegasys/web3signer/slashingprotection/dao/HighWatermark.java create mode 100644 slashing-protection/src/main/resources/migrations/postgresql/V00012__add_highwatermark_metadata.sql diff --git a/slashing-protection/src/integration-test/java/tech/pegasys/web3signer/slashingprotection/PruningIntegrationTest.java b/slashing-protection/src/integration-test/java/tech/pegasys/web3signer/slashingprotection/PruningIntegrationTest.java index 60c0db576..a84620d38 100644 --- a/slashing-protection/src/integration-test/java/tech/pegasys/web3signer/slashingprotection/PruningIntegrationTest.java +++ b/slashing-protection/src/integration-test/java/tech/pegasys/web3signer/slashingprotection/PruningIntegrationTest.java @@ -16,6 +16,8 @@ import static db.DatabaseUtil.USERNAME; import static org.assertj.core.api.Assertions.assertThat; +import tech.pegasys.web3signer.slashingprotection.dao.HighWatermark; +import tech.pegasys.web3signer.slashingprotection.dao.MetadataDao; import tech.pegasys.web3signer.slashingprotection.dao.SignedAttestation; import tech.pegasys.web3signer.slashingprotection.dao.SignedBlock; import tech.pegasys.web3signer.slashingprotection.dao.SigningWatermark; @@ -118,6 +120,30 @@ void watermarkIsNotMovedLower() { assertThat(getWatermark(1).getSlot()).isEqualTo(UInt64.valueOf(8)); } + @Test + void lowWatermarkCanMoveToEqualHighWatermark() { + // in the extreme case where we only keep 1 epoch, the low watermark may move to match the high + // watermark + final SlashingProtectionContext slashingProtectionContext = + SlashingProtectionContextFactory.create( + new TestSlashingProtectionParameters(databaseUrl, USERNAME, PASSWORD, 1, 1)); + insertValidatorAndCreateSlashingData( + slashingProtectionContext.getRegisteredValidators(), 10, 10, 1); + MetadataDao metadataDao = new MetadataDao(); + jdbi.useTransaction( + h -> { + lowWatermarkDao.updateSlotWatermarkFor(h, 1, UInt64.valueOf(8)); + lowWatermarkDao.updateEpochWatermarksFor(h, 1, UInt64.valueOf(8), UInt64.valueOf(8)); + metadataDao.updateHighWatermark( + h, new HighWatermark(UInt64.valueOf(9), UInt64.valueOf(9))); + }); + slashingProtectionContext.getPruner().prune(); + + assertThat(fetchAttestations(1)).hasSize(1); + assertThat(fetchBlocks(1)).hasSize(1); + assertThat(getWatermark(1).getSlot()).isEqualTo(UInt64.valueOf(9)); + } + @Test void noPruningOccursWhenThereIsNoWatermark() { final SlashingProtectionContext slashingProtectionContext = diff --git a/slashing-protection/src/main/java/tech/pegasys/web3signer/slashingprotection/dao/DatabaseVersionDao.java b/slashing-protection/src/main/java/tech/pegasys/web3signer/slashingprotection/dao/DatabaseVersionDao.java index 268639d5d..f3645fc01 100644 --- a/slashing-protection/src/main/java/tech/pegasys/web3signer/slashingprotection/dao/DatabaseVersionDao.java +++ b/slashing-protection/src/main/java/tech/pegasys/web3signer/slashingprotection/dao/DatabaseVersionDao.java @@ -15,7 +15,7 @@ import org.jdbi.v3.core.Handle; public class DatabaseVersionDao { - public static final int EXPECTED_DATABASE_VERSION = 11; + public static final int EXPECTED_DATABASE_VERSION = 12; public static final int VALIDATOR_ENABLE_FLAG_VERSION = 10; public Integer findDatabaseVersion(final Handle handle) { diff --git a/slashing-protection/src/main/java/tech/pegasys/web3signer/slashingprotection/dao/HighWatermark.java b/slashing-protection/src/main/java/tech/pegasys/web3signer/slashingprotection/dao/HighWatermark.java new file mode 100644 index 000000000..aa81a8665 --- /dev/null +++ b/slashing-protection/src/main/java/tech/pegasys/web3signer/slashingprotection/dao/HighWatermark.java @@ -0,0 +1,60 @@ +/* + * Copyright 2023 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.web3signer.slashingprotection.dao; + +import java.util.Objects; + +import org.apache.tuweni.units.bigints.UInt64; + +public class HighWatermark { + + private UInt64 slot; + private UInt64 epoch; + + // needed for JDBI + public HighWatermark() {} + + public HighWatermark(final UInt64 slot, final UInt64 epoch) { + this.slot = slot; + this.epoch = epoch; + } + + public UInt64 getSlot() { + return slot; + } + + public UInt64 getEpoch() { + return epoch; + } + + public void setSlot(final UInt64 slot) { + this.slot = slot; + } + + public void setEpoch(final UInt64 epoch) { + this.epoch = epoch; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + HighWatermark that = (HighWatermark) o; + return Objects.equals(slot, that.slot) && Objects.equals(epoch, that.epoch); + } + + @Override + public int hashCode() { + return Objects.hash(slot, epoch); + } +} diff --git a/slashing-protection/src/main/java/tech/pegasys/web3signer/slashingprotection/dao/MetadataDao.java b/slashing-protection/src/main/java/tech/pegasys/web3signer/slashingprotection/dao/MetadataDao.java index e9b61bdd2..a5a5864fa 100644 --- a/slashing-protection/src/main/java/tech/pegasys/web3signer/slashingprotection/dao/MetadataDao.java +++ b/slashing-protection/src/main/java/tech/pegasys/web3signer/slashingprotection/dao/MetadataDao.java @@ -36,4 +36,32 @@ public void insertGenesisValidatorsRoot( .bind(1, genesisValidatorsRoot) .execute(); } + + public Optional findHighWatermark(Handle handle) { + return handle + .createQuery( + "SELECT high_watermark_epoch as epoch, high_watermark_slot as slot FROM metadata WHERE id = ?") + .bind(0, METADATA_ROW_ID) + .mapToBean(HighWatermark.class) + .filter(h -> h.getEpoch() != null && h.getSlot() != null) + .findFirst(); + } + + public int updateHighWatermark(final Handle handle, final HighWatermark highWatermark) { + return handle + .createUpdate( + "UPDATE metadata set high_watermark_epoch=:epoch, high_watermark_slot=:slot WHERE id =:id") + .bind("id", METADATA_ROW_ID) + .bind("epoch", highWatermark.getEpoch()) + .bind("slot", highWatermark.getSlot()) + .execute(); + } + + public void deleteHighWatermark(final Handle handle) { + handle + .createUpdate( + "UPDATE metadata set high_watermark_epoch=NULL, high_watermark_slot=NULL WHERE id =:id") + .bind("id", METADATA_ROW_ID) + .execute(); + } } diff --git a/slashing-protection/src/main/resources/migrations/postgresql/V00012__add_highwatermark_metadata.sql b/slashing-protection/src/main/resources/migrations/postgresql/V00012__add_highwatermark_metadata.sql new file mode 100644 index 000000000..0b46fc2f9 --- /dev/null +++ b/slashing-protection/src/main/resources/migrations/postgresql/V00012__add_highwatermark_metadata.sql @@ -0,0 +1,63 @@ +ALTER TABLE metadata + ADD COLUMN high_watermark_epoch NUMERIC(20), + ADD COLUMN high_watermark_slot NUMERIC(20); + +-- inserted high watermark should be above low watermark + +CREATE OR REPLACE FUNCTION check_high_watermarks() RETURNS TRIGGER AS $$ +DECLARE + max_slot NUMERIC(20); + max_epoch NUMERIC(20); +BEGIN +SELECT MAX(slot) INTO max_slot FROM low_watermarks; +SELECT GREATEST(MAX(target_epoch), MAX(source_epoch)) INTO max_epoch FROM low_watermarks; + +IF NEW.high_watermark_slot <= max_slot THEN + RAISE EXCEPTION 'Insert/Update violates constraint: high_watermark_slot must be greater than max slot in low_watermarks table'; +END IF; + +IF NEW.high_watermark_epoch <= max_epoch THEN + RAISE EXCEPTION 'Insert/Update violates constraint: high_watermark_epoch must be greater than max epoch in low_watermarks table'; +END IF; + +RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +CREATE TRIGGER check_before_insert_or_update_high_watermarks + BEFORE INSERT OR UPDATE ON metadata + FOR EACH ROW EXECUTE PROCEDURE check_high_watermarks(); + + +-- inserted low watermark should be below or the same as high watermark + +CREATE OR REPLACE FUNCTION check_low_watermarks() RETURNS TRIGGER AS $$ +DECLARE + high_slot NUMERIC(20); + high_epoch NUMERIC(20); +BEGIN +SELECT MIN(high_watermark_slot) INTO high_slot FROM metadata; +SELECT MIN(high_watermark_epoch) INTO high_epoch FROM metadata; + +IF NEW.slot > high_slot THEN + RAISE EXCEPTION 'Insert/Update violates constraint: low_watermark slot must be less than or equal to high_watermark_slot in the metadata table'; +END IF; + +IF NEW.source_epoch > high_epoch THEN + RAISE EXCEPTION 'Insert/Update violates constraint: low_watermark source epoch must be less than or equal to high_watermark_epoch in the metadata table'; +END IF; + +IF NEW.target_epoch > high_epoch THEN + RAISE EXCEPTION 'Insert/Update violates constraint: low_watermark target epoch must be less than or equal to high_watermark_epoch in the metadata table'; +END IF; + +RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +CREATE TRIGGER check_before_insert_or_update_low_watermarks + BEFORE INSERT OR UPDATE ON low_watermarks + FOR EACH ROW EXECUTE PROCEDURE check_low_watermarks(); + + +UPDATE database_version SET version = 12 WHERE id = 1; \ No newline at end of file diff --git a/slashing-protection/src/test/java/tech/pegasys/web3signer/slashingprotection/dao/LowWatermarkDaoTest.java b/slashing-protection/src/test/java/tech/pegasys/web3signer/slashingprotection/dao/LowWatermarkDaoTest.java index bd57aebe3..4bcaab790 100644 --- a/slashing-protection/src/test/java/tech/pegasys/web3signer/slashingprotection/dao/LowWatermarkDaoTest.java +++ b/slashing-protection/src/test/java/tech/pegasys/web3signer/slashingprotection/dao/LowWatermarkDaoTest.java @@ -19,6 +19,7 @@ import db.DatabaseSetupExtension; import org.apache.tuweni.bytes.Bytes; +import org.apache.tuweni.bytes.Bytes32; import org.apache.tuweni.units.bigints.UInt64; import org.jdbi.v3.core.Handle; import org.jdbi.v3.core.statement.UnableToExecuteStatementException; @@ -200,7 +201,99 @@ public void canUpdateAttestationWatermarkAfterBlockWatermark(final Handle handle assertThat(watermark.get().getTargetEpoch()).isEqualTo(UInt64.valueOf(5)); } + @Test + public void canCreateLowWatermarkSlotIfLessThanOrEqualToHighWatermarkSlot(final Handle handle) { + insertValidator(handle, Bytes.of(100), 1); + UInt64 slot = UInt64.valueOf(2); + updateHighWatermark(handle, UInt64.MAX_VALUE, slot); + + assertThat(lowWatermarkDao.findLowWatermarkForValidator(handle, 1)).isEmpty(); + + lowWatermarkDao.updateSlotWatermarkFor(handle, 1, slot); + + Optional watermark = lowWatermarkDao.findLowWatermarkForValidator(handle, 1); + assertThat(watermark).isNotEmpty(); + assertThat(watermark.get().getSlot()).isEqualTo(UInt64.valueOf(2)); + } + + @Test + public void canCreateLowWatermarkSourceEpochIfLessThanOrEqualToHighWatermarkEpoch( + final Handle handle) { + insertValidator(handle, Bytes.of(100), 1); + UInt64 sourceEpoch = UInt64.valueOf(2); + UInt64 targetEpoch = UInt64.valueOf(2); + updateHighWatermark(handle, sourceEpoch, UInt64.MAX_VALUE); + + lowWatermarkDao.updateEpochWatermarksFor(handle, 1, sourceEpoch, targetEpoch); + + Optional watermark = lowWatermarkDao.findLowWatermarkForValidator(handle, 1); + assertThat(watermark.get().getSourceEpoch()).isEqualTo(sourceEpoch); + } + + @Test + public void canCreateLowWatermarkTargetEpochIfLessThanOrEqualToHighWatermarkEpoch( + final Handle handle) { + insertValidator(handle, Bytes.of(100), 1); + UInt64 sourceEpoch = UInt64.valueOf(2); + UInt64 targetEpoch = UInt64.valueOf(3); + updateHighWatermark(handle, targetEpoch, UInt64.MAX_VALUE); + + lowWatermarkDao.updateEpochWatermarksFor(handle, 1, sourceEpoch, targetEpoch); + + Optional watermark = lowWatermarkDao.findLowWatermarkForValidator(handle, 1); + assertThat(watermark.get().getTargetEpoch()).isEqualTo(targetEpoch); + } + + @Test + public void cannotCreateLowWatermarkSlotIfGreaterThanHighWatermarkSlot(final Handle handle) { + insertValidator(handle, Bytes.of(100), 1); + UInt64 slot = UInt64.valueOf(3); + + updateHighWatermark(handle, UInt64.MAX_VALUE, slot.subtract(1L)); + + assertThatThrownBy(() -> lowWatermarkDao.updateSlotWatermarkFor(handle, 1, slot)) + .hasMessageContaining( + "low_watermark slot must be less than or equal to high_watermark_slot in the metadata table"); + } + + @Test + public void cannotCreateLowWatermarkSourceEpochIfGreaterThanHighWatermarkEpoch( + final Handle handle) { + insertValidator(handle, Bytes.of(100), 1); + UInt64 sourceEpoch = UInt64.valueOf(3); + UInt64 targetEpoch = UInt64.valueOf(3); + updateHighWatermark(handle, sourceEpoch.subtract(1L), UInt64.MAX_VALUE); + + assertThatThrownBy( + () -> lowWatermarkDao.updateEpochWatermarksFor(handle, 1, sourceEpoch, targetEpoch)) + .hasMessageContaining( + "low_watermark source epoch must be less than or equal to high_watermark_epoch in the metadata table"); + } + + @Test + public void cannotCreateLowWatermarkTargetEpochIfGreaterThanHighWatermarkEpoch( + final Handle handle) { + insertValidator(handle, Bytes.of(100), 1); + UInt64 sourceEpoch = UInt64.valueOf(2); + UInt64 targetEpoch = UInt64.valueOf(3); + updateHighWatermark(handle, targetEpoch.subtract(1L), UInt64.MAX_VALUE); + + assertThatThrownBy( + () -> lowWatermarkDao.updateEpochWatermarksFor(handle, 1, sourceEpoch, targetEpoch)) + .hasMessageContaining( + "low_watermark target epoch must be less than or equal to high_watermark_epoch in the metadata table"); + } + private void insertValidator(final Handle handle, final Bytes publicKey, final int validatorId) { handle.execute("INSERT INTO validators (id, public_key) VALUES (?, ?)", validatorId, publicKey); } + + private void updateHighWatermark(final Handle handle, final UInt64 epoch, final UInt64 slot) { + handle.execute( + "INSERT INTO metadata (id, genesis_validators_root, high_watermark_epoch, high_watermark_slot) VALUES (?, ?, ?, ?)", + 1, + Bytes32.leftPad(Bytes.of(3)), + epoch, + slot); + } } diff --git a/slashing-protection/src/test/java/tech/pegasys/web3signer/slashingprotection/dao/MetadataDaoTest.java b/slashing-protection/src/test/java/tech/pegasys/web3signer/slashingprotection/dao/MetadataDaoTest.java index 3a3842758..15e268f69 100644 --- a/slashing-protection/src/test/java/tech/pegasys/web3signer/slashingprotection/dao/MetadataDaoTest.java +++ b/slashing-protection/src/test/java/tech/pegasys/web3signer/slashingprotection/dao/MetadataDaoTest.java @@ -21,6 +21,7 @@ import db.DatabaseSetupExtension; import org.apache.tuweni.bytes.Bytes; import org.apache.tuweni.bytes.Bytes32; +import org.apache.tuweni.units.bigints.UInt64; import org.jdbi.v3.core.Handle; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -28,6 +29,9 @@ @ExtendWith(DatabaseSetupExtension.class) public class MetadataDaoTest { private final MetadataDao metadataDao = new MetadataDao(); + private static final UInt64 MAX_LOW_WATERMARK_SLOT = UInt64.valueOf(13); + private static final UInt64 MAX_LOW_WATERMARK_SOURCE_EPOCH = UInt64.valueOf(11); + private static final UInt64 MAX_LOW_WATERMARK_TARGET_EPOCH = UInt64.valueOf(12); @Test public void findsExistingGvrInDb(final Handle handle) { @@ -38,6 +42,16 @@ public void findsExistingGvrInDb(final Handle handle) { assertThat(existingGvr).contains(Bytes32.leftPad(Bytes.of(3))); } + @Test + public void findsExistingGvrAfterHighWatermarkIsSet(final Handle handle) { + insertGvr(handle, Bytes32.leftPad(Bytes.of(3))); + updateHighWatermark(handle, 1, 2); + + final Optional existingGvr = metadataDao.findGenesisValidatorsRoot(handle); + assertThat(existingGvr).isNotEmpty(); + assertThat(existingGvr).contains(Bytes32.leftPad(Bytes.of(3))); + } + @Test public void returnsEmptyForNonExistingGvrInDb(final Handle handle) { assertThat(metadataDao.findGenesisValidatorsRoot(handle)).isEmpty(); @@ -66,10 +80,159 @@ public void failsInsertingMultipleGvrIntoDb(final Handle handle) { .hasMessageContaining("duplicate key value violates unique constraint"); } + @Test + public void findsExistingHighWatermark(final Handle handle) { + insertGvr(handle, Bytes32.leftPad(Bytes.of(3))); + updateHighWatermark(handle, 1, 2); + + final Optional existingHighWatermark = metadataDao.findHighWatermark(handle); + + assertThat(existingHighWatermark).isNotEmpty(); + assertThat(existingHighWatermark) + .contains(new HighWatermark(UInt64.valueOf(2), UInt64.valueOf(1))); + } + + @Test + public void returnsEmptyForNonExistingHighWatermark(final Handle handle) { + assertThat(metadataDao.findHighWatermark(handle)).isEmpty(); + } + + @Test + public void returnsEmptyForNonExistingHighWatermarkWhenGvrSet(final Handle handle) { + insertGvr(handle, Bytes32.leftPad(Bytes.of(3))); + assertThat(metadataDao.findHighWatermark(handle)).isEmpty(); + } + + @Test + public void insertsHighWatermark(final Handle handle) { + insertGvr(handle, Bytes32.leftPad(Bytes.of(3))); + HighWatermark highWatermark = new HighWatermark(UInt64.valueOf(2), UInt64.valueOf(1)); + + int updateCount = metadataDao.updateHighWatermark(handle, highWatermark); + + assertThat(updateCount).isEqualTo(1); + final List highWatermarks = + handle + .createQuery( + "SELECT high_watermark_epoch as epoch, high_watermark_slot as slot FROM metadata") + .mapToBean(HighWatermark.class) + .list(); + assertThat(highWatermarks.size()).isEqualTo(1); + assertThat(highWatermarks.get(0)).isEqualTo(highWatermark); + } + + @Test + public void updatesHighWatermark(final Handle handle) { + insertGvr(handle, Bytes32.leftPad(Bytes.of(3))); + updateHighWatermark(handle, 1, 2); + HighWatermark highWatermark = createHighWatermark(3, 3); + + int updateCount = metadataDao.updateHighWatermark(handle, highWatermark); + + assertThat(updateCount).isEqualTo(1); + final List highWatermarks = + handle + .createQuery( + "SELECT high_watermark_epoch as epoch, high_watermark_slot as slot FROM metadata") + .mapToBean(HighWatermark.class) + .list(); + assertThat(highWatermarks.size()).isEqualTo(1); + assertThat(highWatermarks.get(0)).isEqualTo(highWatermark); + } + + @Test + public void updateHighWatermarkWhenNoGvrHasNoEffect(final Handle handle) { + int updateCount = metadataDao.updateHighWatermark(handle, createHighWatermark(1, 1)); + assertThat(updateCount).isEqualTo(0); + } + + @Test + public void updateHighWatermarkFailsWhenNotGreaterThanMaxLowWatermarkSlot(final Handle handle) { + insertGvr(handle, Bytes32.leftPad(Bytes.of(3))); + insertLowWatermarks(handle); + // high watermark == max low watermark + HighWatermark highWatermark = createHighWatermark(MAX_LOW_WATERMARK_SLOT, UInt64.MAX_VALUE); + assertThatThrownBy(() -> metadataDao.updateHighWatermark(handle, highWatermark)) + .hasMessageContaining( + "high_watermark_slot must be greater than max slot in low_watermarks table"); + } + + @Test + public void updateHighWatermarkFailsWhenNotGreaterThanMaxLowWatermarkTargetEpoch( + final Handle handle) { + insertGvr(handle, Bytes32.leftPad(Bytes.of(3))); + insertLowWatermarks(handle); + // high watermark == max low watermark + HighWatermark highWatermark = + createHighWatermark(UInt64.MAX_VALUE, MAX_LOW_WATERMARK_TARGET_EPOCH); + + assertThatThrownBy(() -> metadataDao.updateHighWatermark(handle, highWatermark)) + .hasMessageContaining( + "high_watermark_epoch must be greater than max epoch in low_watermarks table"); + } + + @Test + public void updateHighWatermarkFailsWhenNotGreaterThanMaxLowWatermarkSourceEpoch( + final Handle handle) { + insertGvr(handle, Bytes32.leftPad(Bytes.of(3))); + insertLowWatermarks(handle); + // high watermark == max low watermark + HighWatermark highWatermark = + createHighWatermark(UInt64.MAX_VALUE, MAX_LOW_WATERMARK_SOURCE_EPOCH); + + assertThatThrownBy(() -> metadataDao.updateHighWatermark(handle, highWatermark)) + .hasMessageContaining( + "high_watermark_epoch must be greater than max epoch in low_watermarks table"); + } + + @Test + public void deletesHighWatermark(final Handle handle) { + insertGvr(handle, Bytes32.leftPad(Bytes.of(3))); + updateHighWatermark(handle, 2, 2); + assertThat(metadataDao.findHighWatermark(handle)).isNotEmpty(); + + metadataDao.deleteHighWatermark(handle); + + assertThat(metadataDao.findHighWatermark(handle)).isEmpty(); + } + private void insertGvr(final Handle handle, final Bytes genesisValidatorsRoot) { handle.execute( "INSERT INTO metadata (id, genesis_validators_root) VALUES (?, ?)", 1, genesisValidatorsRoot); } + + private void insertLowWatermarks(Handle handle) { + handle.execute("INSERT INTO validators (public_key, enabled) VALUES (?, ?)", Bytes.of(1), true); + handle.execute("INSERT INTO validators (public_key, enabled) VALUES (?, ?)", Bytes.of(2), true); + handle.execute( + "INSERT INTO low_watermarks (validator_id, slot, target_epoch, source_epoch) VALUES (?, ?, ?, ?)", + 1, + 3, + 2, + 1); + handle.execute( + "INSERT INTO low_watermarks (validator_id, slot, target_epoch, source_epoch) VALUES (?, ?, ?, ?)", + 2, + MAX_LOW_WATERMARK_SLOT, + MAX_LOW_WATERMARK_TARGET_EPOCH, + MAX_LOW_WATERMARK_SOURCE_EPOCH); + } + + private void updateHighWatermark(final Handle handle, final int epoch, final int slot) { + handle + .createUpdate("UPDATE metadata set high_watermark_epoch=:epoch, high_watermark_slot=:slot") + .bind("epoch", epoch) + .bind("slot", slot) + .execute(); + } + + private HighWatermark createHighWatermark(final int slot, final int epoch) { + return new HighWatermark(UInt64.valueOf(slot), UInt64.valueOf(epoch)); + } + + private HighWatermark createHighWatermark(final UInt64 slot, final UInt64 epoch) { + return new HighWatermark(slot, epoch); + } }