Skip to content

Commit

Permalink
fix: consume all system config event
Browse files Browse the repository at this point in the history
  • Loading branch information
thinkAfCod authored and GrapeBaBa committed Jul 12, 2024
1 parent a1523d6 commit 6d02b0d
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 53 deletions.
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ services:
command:
- --config.file=/etc/prometheus/prometheus.yml
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml:ro
- ./docker/prometheus.yml:/etc/prometheus/prometheus.yml:ro
<<: *logging

node-exporter:
Expand Down
6 changes: 3 additions & 3 deletions hildr-node/src/main/java/io/optimism/config/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ public boolean isEcotoneActivationBlock(BigInteger time) {
* @return true if the time is the ecotone activation block, otherwise false.
*/
public boolean isEcotone(BigInteger time) {
return ecotoneTime.compareTo(BigInteger.ZERO) > 0 && time.compareTo(ecotoneTime) >= 0;
return ecotoneTime.compareTo(BigInteger.ZERO) >= 0 && time.compareTo(ecotoneTime) >= 0;
}

/**
Expand All @@ -324,7 +324,7 @@ public boolean isEcotoneAndNotFirst(BigInteger time) {
* @return true if the time is the fjord activation block, otherwise false.
*/
public boolean isFjord(BigInteger time) {
return fjordTime.compareTo(BigInteger.ZERO) > 0 && time.compareTo(fjordTime) >= 0;
return fjordTime.compareTo(BigInteger.ZERO) >= 0 && time.compareTo(fjordTime) >= 0;
}

/**
Expand All @@ -346,7 +346,7 @@ public boolean isFjordActivationBlock(BigInteger time) {
* @return true if the time is the canyon activation block, otherwise false.
*/
public boolean isCanyon(BigInteger time) {
return canyonTime.compareTo(BigInteger.ZERO) > 0 && time.compareTo(canyonTime) >= 0;
return canyonTime.compareTo(BigInteger.ZERO) >= 0 && time.compareTo(canyonTime) >= 0;
}

/**
Expand Down
2 changes: 1 addition & 1 deletion hildr-node/src/main/java/io/optimism/driver/Driver.java
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ private void advanceUnsafeHead() throws ExecutionException, InterruptedException
.subtract(syncedBlockNum)
.compareTo(BigInteger.valueOf(1024L))
< 0)
.toList();
.collect(Collectors.toList());
}
}
if (this.futureUnsafeBlocks.isEmpty()) {
Expand Down
152 changes: 104 additions & 48 deletions hildr-node/src/main/java/io/optimism/l1/InnerWatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@
import io.optimism.utilities.rpc.Web3jProvider;
import io.optimism.utilities.telemetry.Logging;
import io.optimism.utilities.telemetry.TracerTaskWrapper;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.disposables.Disposable;
import java.math.BigInteger;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -45,7 +49,6 @@
import org.web3j.protocol.core.methods.response.EthBlock;
import org.web3j.protocol.core.methods.response.EthLog;
import org.web3j.protocol.core.methods.response.EthLog.LogObject;
import org.web3j.protocol.core.methods.response.EthLog.LogResult;
import org.web3j.protocol.websocket.events.NewHead;
import org.web3j.tuples.generated.Tuple2;
import org.web3j.tuples.generated.Tuple3;
Expand Down Expand Up @@ -85,7 +88,7 @@ public class InnerWatcher extends AbstractExecutionThreadService {
*/
private final Web3j provider;

private final Web3j wsProvider;
private Web3j wsProvider;

/**
* Beacon blob fetcher to fetch the beacon blob from the L1 beacon endpoint.
Expand Down Expand Up @@ -156,6 +159,8 @@ public class InnerWatcher extends AbstractExecutionThreadService {

private boolean devnet = false;

private ScheduledThreadPoolExecutor scheduledExecutorService;

/**
* create a InnerWatcher instance.
*
Expand All @@ -168,7 +173,9 @@ public InnerWatcher(
Config config, MessagePassingQueue<BlockUpdate> queue, BigInteger l1StartBlock, BigInteger l2StartBlock) {
this.config = config;
this.provider = Web3jProvider.createClient(config.l1RpcUrl());
this.wsProvider = Web3jProvider.createClient(config.l1WsRpcUrl());
if (StringUtils.isNotEmpty(config.l1WsRpcUrl())) {
this.wsProvider = Web3jProvider.createClient(config.l1WsRpcUrl());
}
this.beaconFetcher = new BeaconBlobFetcher(config.l1BeaconUrl(), config.l1BeaconArchiverUrl());
this.l2StartBlock = l2StartBlock;
this.devnet = config.devnet() != null && config.devnet();
Expand Down Expand Up @@ -213,22 +220,49 @@ private void getMetadataFromL2(BigInteger l2StartBlock) {
}

private Disposable subscribeL1NewHeads() {
this.l1HeadListener = this.wsProvider
.newHeadsNotifications()
.subscribe(
notification -> {
NewHead header = notification.getParams().getResult();
String hash = header.getHash();
BigInteger number = Numeric.toBigInt(header.getNumber());
String parentHash = header.getParentHash();
BigInteger time = Numeric.toBigInt(header.getTimestamp());
l1Head = new BlockInfo(hash, number, parentHash, time);
},
t -> {
if (t instanceof WebsocketNotConnectedException) {
this.subscribeL1NewHeads();
}
});
if (this.wsProvider != null) {
this.l1HeadListener = this.wsProvider
.newHeadsNotifications()
.subscribe(
notification -> {
NewHead header = notification.getParams().getResult();
String hash = header.getHash();
BigInteger number = Numeric.toBigInt(header.getNumber());
String parentHash = header.getParentHash();
BigInteger time = Numeric.toBigInt(header.getTimestamp());
l1Head = new BlockInfo(hash, number, parentHash, time);
},
t -> {
if (t instanceof WebsocketNotConnectedException) {
this.subscribeL1NewHeads();
}
});
} else {
this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
this.l1HeadListener = Flowable.create(
(subscriber) -> {
this.scheduledExecutorService.scheduleAtFixedRate(
() -> {
EthBlock.Block block = null;
try {
block = pollBlock(
this.provider, DefaultBlockParameterName.LATEST, false);
} catch (ExecutionException | InterruptedException e) {
LOGGER.warn("error while fetching L1 data for block", e);
}
subscriber.onNext(block);
},
0,
12,
TimeUnit.SECONDS);
},
BackpressureStrategy.BUFFER)
.subscribe(notification -> {
EthBlock.Block block = (EthBlock.Block) notification;
l1Head = BlockInfo.from(block);
});
}

return this.l1HeadListener;
}

Expand Down Expand Up @@ -381,30 +415,45 @@ private void putBlockUpdate(final BlockUpdate update) {

private void updateSystemConfig(BlockInfo l1BlockInfo) throws ExecutionException, InterruptedException {
BigInteger preLastUpdateBlock = this.systemConfigUpdate.component1();
if (preLastUpdateBlock.compareTo(this.currentBlock) < 0) {
BigInteger toBlock = preLastUpdateBlock.add(BigInteger.valueOf(1000L));
if (preLastUpdateBlock.compareTo(this.currentBlock) <= 0) {
BigInteger fromBlock = preLastUpdateBlock.equals(BigInteger.ZERO)
? BigInteger.ZERO
: preLastUpdateBlock.add(BigInteger.ONE);
BigInteger toBlock = preLastUpdateBlock.add(BigInteger.valueOf(100L));
LOGGER.debug(
"will get system update eth log: fromBlock={} -> toBlock={}; contract={}",
preLastUpdateBlock.add(BigInteger.ONE),
fromBlock,
toBlock,
InnerWatcher.this.config.chainConfig().systemConfigContract());
EthLog updates = this.getLog(
preLastUpdateBlock.add(BigInteger.ONE),
fromBlock,
toBlock,
InnerWatcher.this.config.chainConfig().systemConfigContract(),
CONFIG_UPDATE_TOPIC);

if (updates.getLogs().isEmpty()) {
this.systemConfigUpdate = new Tuple2<>(toBlock, null);
} else {
LogResult<?> update = updates.getLogs().getFirst();
BigInteger updateBlock = ((LogObject) update).getBlockNumber();
SystemConfigUpdate configUpdate = SystemConfigUpdate.tryFrom((LogObject) update);
if (updateBlock == null) {
BigInteger updateBlockNum = ((LogObject) updates.getLogs().getFirst()).getBlockNumber();
SystemConfig updatedConfig = this.systemConfig;
boolean updated = false;
for (int i = 0; i < updates.getLogs().size(); i++) {
LogObject update = (LogObject) updates.getLogs().get(i);
BigInteger updateBlock = update.getBlockNumber();
if (updateBlock == null) {
break;
}
if (!updateBlock.equals(updateBlockNum)) {
break;
}
SystemConfigUpdate configUpdate = SystemConfigUpdate.tryFrom(update);
updatedConfig = parseSystemConfigUpdate(updatedConfig, l1BlockInfo, configUpdate);
updated = true;
}
if (!updated) {
this.systemConfigUpdate = new Tuple2<>(toBlock, null);
} else {
SystemConfig updateSystemConfig = parseSystemConfigUpdate(l1BlockInfo, configUpdate);
this.systemConfigUpdate = new Tuple2<>(updateBlock, updateSystemConfig);
this.systemConfigUpdate = new Tuple2<>(updateBlockNum, updatedConfig);
}
}
}
Expand All @@ -417,46 +466,47 @@ private void updateSystemConfig(BlockInfo l1BlockInfo) throws ExecutionException
}
}

private Config.SystemConfig parseSystemConfigUpdate(BlockInfo l1BlockInfo, SystemConfigUpdate configUpdate) {
private Config.SystemConfig parseSystemConfigUpdate(
SystemConfig lastSystemConfig, BlockInfo l1BlockInfo, SystemConfigUpdate configUpdate) {
Config.SystemConfig updateSystemConfig = null;
if (configUpdate instanceof SystemConfigUpdate.BatchSender) {
updateSystemConfig = new Config.SystemConfig(
((SystemConfigUpdate.BatchSender) configUpdate).getAddress(),
this.systemConfig.gasLimit(),
this.systemConfig.l1FeeOverhead(),
this.systemConfig.l1FeeScalar(),
this.systemConfig.unsafeBlockSigner());
lastSystemConfig.gasLimit(),
lastSystemConfig.l1FeeOverhead(),
lastSystemConfig.l1FeeScalar(),
lastSystemConfig.unsafeBlockSigner());
} else if (configUpdate instanceof SystemConfigUpdate.Fees) {
var ecotoneTime = this.config.chainConfig().ecotoneTime();
if (ecotoneTime.compareTo(BigInteger.ZERO) > 0
&& l1BlockInfo.timestamp().compareTo(ecotoneTime) >= 0) {
updateSystemConfig = new Config.SystemConfig(
this.systemConfig.batchSender(),
this.systemConfig.gasLimit(),
lastSystemConfig.batchSender(),
lastSystemConfig.gasLimit(),
BigInteger.ZERO,
((SystemConfigUpdate.Fees) configUpdate).getFeeScalar(),
this.systemConfig.unsafeBlockSigner());
lastSystemConfig.unsafeBlockSigner());
} else {
updateSystemConfig = new Config.SystemConfig(
this.systemConfig.batchSender(),
this.systemConfig.gasLimit(),
lastSystemConfig.batchSender(),
lastSystemConfig.gasLimit(),
((SystemConfigUpdate.Fees) configUpdate).getFeeOverhead(),
((SystemConfigUpdate.Fees) configUpdate).getFeeScalar(),
this.systemConfig.unsafeBlockSigner());
lastSystemConfig.unsafeBlockSigner());
}
} else if (configUpdate instanceof SystemConfigUpdate.GasLimit) {
updateSystemConfig = new Config.SystemConfig(
this.systemConfig.batchSender(),
lastSystemConfig.batchSender(),
((SystemConfigUpdate.GasLimit) configUpdate).getGas(),
this.systemConfig.l1FeeOverhead(),
this.systemConfig.l1FeeScalar(),
this.systemConfig.unsafeBlockSigner());
lastSystemConfig.l1FeeOverhead(),
lastSystemConfig.l1FeeScalar(),
lastSystemConfig.unsafeBlockSigner());
} else if (configUpdate instanceof SystemConfigUpdate.UnsafeBlockSigner) {
updateSystemConfig = new Config.SystemConfig(
this.systemConfig.batchSender(),
this.systemConfig.gasLimit(),
this.systemConfig.l1FeeOverhead(),
this.systemConfig.l1FeeScalar(),
lastSystemConfig.batchSender(),
lastSystemConfig.gasLimit(),
lastSystemConfig.l1FeeOverhead(),
lastSystemConfig.l1FeeScalar(),
((SystemConfigUpdate.UnsafeBlockSigner) configUpdate).getAddress());
}
return updateSystemConfig;
Expand Down Expand Up @@ -601,6 +651,12 @@ protected void shutDown() {
if (!this.l1HeadListener.isDisposed()) {
this.l1HeadListener.dispose();
}
if (this.wsProvider != null) {
this.wsProvider.shutdown();
}
if (this.scheduledExecutorService != null) {
this.scheduledExecutorService.shutdown();
}
}

@Override
Expand Down

0 comments on commit 6d02b0d

Please sign in to comment.