Skip to content

Commit

Permalink
refactors into reusable BlobCache, and rekeys on versioned hash
Browse files Browse the repository at this point in the history
Signed-off-by: Justin Florentine <[email protected]>
  • Loading branch information
jflo committed Nov 9, 2023
1 parent e42e889 commit 3b67fa5
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,18 @@
package org.hyperledger.besu.datatypes;

import java.security.InvalidParameterException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

/** A class to hold the blobs, commitments, proofs and versioned hashes for a set of blobs. */
public class BlobsWithCommitments {
private final List<KZGCommitment> kzgCommitments;
private final List<Blob> blobs;
private final List<KZGProof> kzgProofs;

private final List<VersionedHash> versionedHashes;
public record BlobQuad(
Blob blob, KZGCommitment kzgCommitment, KZGProof kzgProof, VersionedHash versionedHash) {}
;

private final List<BlobQuad> blobQuads;

/**
* A class to hold the blobs, commitments and proofs for a set of blobs.
Expand All @@ -48,10 +51,17 @@ public BlobsWithCommitments(
throw new InvalidParameterException(
"There must be an equal number of blobs, commitments, proofs, and versioned hashes");
}
this.kzgCommitments = kzgCommitments;
this.blobs = blobs;
this.kzgProofs = kzgProofs;
this.versionedHashes = versionedHashes;
ArrayList<BlobQuad> toBuild = new ArrayList<>();
for (int i = 0; i < blobs.size(); i++) {
toBuild.add(
new BlobQuad(
blobs.get(i), kzgCommitments.get(i), kzgProofs.get(i), versionedHashes.get(i)));
}
this.blobQuads = toBuild;
}

public BlobsWithCommitments(final List<BlobQuad> quads) {
this.blobQuads = quads;
}

/**
Expand All @@ -60,7 +70,7 @@ public BlobsWithCommitments(
* @return the blobs
*/
public List<Blob> getBlobs() {
return blobs;
return blobQuads.stream().map(BlobQuad::blob).collect(Collectors.toList());
}

/**
Expand All @@ -69,7 +79,7 @@ public List<Blob> getBlobs() {
* @return the commitments
*/
public List<KZGCommitment> getKzgCommitments() {
return kzgCommitments;
return blobQuads.stream().map(BlobQuad::kzgCommitment).collect(Collectors.toList());
}

/**
Expand All @@ -78,7 +88,7 @@ public List<KZGCommitment> getKzgCommitments() {
* @return the proofs
*/
public List<KZGProof> getKzgProofs() {
return kzgProofs;
return blobQuads.stream().map(BlobQuad::kzgProof).collect(Collectors.toList());
}

/**
Expand All @@ -87,6 +97,10 @@ public List<KZGProof> getKzgProofs() {
* @return the hashes
*/
public List<VersionedHash> getVersionedHashes() {
return versionedHashes;
return blobQuads.stream().map(BlobQuad::versionedHash).collect(Collectors.toList());
}

public List<BlobQuad> getBlobQuads() {
return blobQuads;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public void copyFromIsIdentical() {
final Transaction copy = builder.copiedFrom(transaction).build();
assertThat(copy).isEqualTo(transaction);
assertThat(copy == transaction).isFalse();
assertThat(copy.getHash()).isEqualTo(transaction.getHash());
BytesValueRLPOutput sourceRLP = new BytesValueRLPOutput();
transaction.writeTo(sourceRLP);
BytesValueRLPOutput copyRLP = new BytesValueRLPOutput();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright Hyperledger Besu contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/

package org.hyperledger.besu.ethereum.eth.transactions;

import org.hyperledger.besu.datatypes.BlobsWithCommitments;
import org.hyperledger.besu.datatypes.VersionedHash;
import org.hyperledger.besu.ethereum.core.Transaction;

import java.util.List;
import java.util.Optional;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BlobCache {
private final Cache<VersionedHash, BlobsWithCommitments.BlobQuad> cache;
private static final Logger LOG = LoggerFactory.getLogger(BlobCache.class);

public BlobCache() {
// TODO: needs size limit, ttl policy and eviction on finalization policy - cache size should
// max out around 6 * blocks since final

this.cache = Caffeine.newBuilder().build();
}

public void cacheBlobs(final Transaction t) {
if (t.getType().supportsBlob()) {
var bwc = t.getBlobsWithCommitments();
if (bwc.isPresent()) {
bwc.get().getBlobQuads().stream()
.forEach(blobQuad -> this.cache.put(blobQuad.versionedHash(), blobQuad));
} else {
LOG.debug("transaction is missing blobs, cannot cache");
}
}
}

public Optional<Transaction> restoreBlob(final Transaction transaction) {
if (transaction.getType().supportsBlob()) {
Optional<List<VersionedHash>> maybeHashes = transaction.getVersionedHashes();
if (maybeHashes.isPresent()) {
if (!maybeHashes.get().isEmpty()) {
Transaction.Builder txBuilder = Transaction.builder();
txBuilder.copiedFrom(transaction);
List<BlobsWithCommitments.BlobQuad> blobQuads =
maybeHashes.get().stream().map(cache::getIfPresent).toList();
final BlobsWithCommitments bwc = new BlobsWithCommitments(blobQuads);
if (blobQuads.stream()
.map(BlobsWithCommitments.BlobQuad::versionedHash)
.toList()
.containsAll(transaction.getVersionedHashes().get())) {
txBuilder.blobsWithCommitments(bwc);
return Optional.of(txBuilder.build());
} else {
LOG.debug("did not find all versioned hashes to restore from cache");
return Optional.empty();
}
} else {
LOG.debug("can't restore blobs for transaction with empty list of versioned hashes");
return Optional.empty();
}
} else {
LOG.debug("can't restore blobs for transaction without list of versioned hashes");
return Optional.empty();
}

} else {
LOG.debug(
"can't restore blobs for non-blob transaction of type " + transaction.getType().name());
return Optional.empty();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.REPLACED;

import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.datatypes.BlobsWithCommitments;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.transactions.BlobCache;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransaction;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransactionAddedListener;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransactionDroppedListener;
Expand All @@ -53,8 +53,6 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -77,7 +75,7 @@ public abstract class AbstractTransactionsLayer implements TransactionsLayer {
private OptionalLong nextLayerOnDroppedListenerId = OptionalLong.empty();
protected long spaceUsed = 0;

private final Cache<Hash, BlobsWithCommitments> blobCache;
private final BlobCache blobCache;

public AbstractTransactionsLayer(
final TransactionPoolConfiguration poolConfig,
Expand All @@ -93,7 +91,7 @@ public AbstractTransactionsLayer(
metrics.initTransactionCount(pendingTransactions::size, name());
metrics.initUniqueSenderCount(txsBySender::size, name());
// TODO: needs size limit, ttl policy and eviction on finalization policy
this.blobCache = Caffeine.newBuilder().build();
this.blobCache = new BlobCache();
}

protected abstract boolean gapsAllowed();
Expand Down Expand Up @@ -370,9 +368,7 @@ protected PendingTransaction processRemove(
final PendingTransaction removedTx = pendingTransactions.remove(transaction.getHash());
if (removedTx.getTransaction().getBlobsWithCommitments().isPresent()
&& CONFIRMED.equals(removalReason)) {
this.blobCache.put(
removedTx.getTransaction().getHash(),
removedTx.getTransaction().getBlobsWithCommitments().get());
this.blobCache.cacheBlobs(removedTx.getTransaction());
}
if (removedTx != null) {
decreaseSpaceUsed(removedTx);
Expand Down Expand Up @@ -612,7 +608,7 @@ boolean consistencyCheck(
protected abstract void internalConsistencyCheck(
final Map<Address, TreeMap<Long, PendingTransaction>> prevLayerTxsBySender);

public Cache<Hash, BlobsWithCommitments> getBlobCache() {
public BlobCache getBlobCache() {
return blobCache;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.RECONCILED;

import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.datatypes.BlobsWithCommitments;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.Transaction;
Expand Down Expand Up @@ -539,15 +538,6 @@ public synchronized String logStats() {

@Override
public Optional<Transaction> restoreBlob(final Transaction transaction) {
Transaction.Builder txBuilder = Transaction.builder();
txBuilder.copiedFrom(transaction);
final BlobsWithCommitments bwc =
prioritizedTransactions.getBlobCache().getIfPresent(transaction.getHash());
if (bwc != null) {
txBuilder.blobsWithCommitments(bwc);
return Optional.of(txBuilder.build());
} else {
return Optional.empty();
}
return prioritizedTransactions.getBlobCache().restoreBlob(transaction);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.REJECTED_UNDERPRICED_REPLACEMENT;

import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.datatypes.BlobsWithCommitments;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.transactions.BlobCache;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransaction;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransactionAddedListener;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransactionDroppedListener;
Expand Down Expand Up @@ -60,8 +60,6 @@
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -100,7 +98,7 @@ public abstract class AbstractPendingTransactionsSorter implements PendingTransa
protected final TransactionPoolReplacementHandler transactionReplacementHandler;
protected final Supplier<BlockHeader> chainHeadHeaderSupplier;

private final Cache<Hash, BlobsWithCommitments> blobCache;
private final BlobCache blobCache;

public AbstractPendingTransactionsSorter(
final TransactionPoolConfiguration poolConfig,
Expand Down Expand Up @@ -135,8 +133,8 @@ public AbstractPendingTransactionsSorter(
"transactions",
"Current size of the transaction pool",
pendingTransactions::size);
// TODO: needs size limit, ttl policy and eviction on finalization policy
this.blobCache = Caffeine.newBuilder().build();

this.blobCache = new BlobCache();
}

@Override
Expand Down Expand Up @@ -421,9 +419,7 @@ private void removeTransaction(final Transaction transaction, final boolean adde
removedPendingTx.isReceivedFromLocalSource(), addedToBlock);
if (removedPendingTx.getTransaction().getBlobsWithCommitments().isPresent()
&& addedToBlock) {
this.blobCache.put(
removedPendingTx.getTransaction().getHash(),
removedPendingTx.getTransaction().getBlobsWithCommitments().get());
this.blobCache.cacheBlobs(removedPendingTx.getTransaction());
}
}
}
Expand Down Expand Up @@ -548,16 +544,13 @@ public void signalInvalidAndRemoveDependentTransactions(final Transaction transa
}
}

/**
* @param transaction to restore blobs onto
* @return an optional copy of the supplied transaction, but with the BlobsWithCommitments
* restored. If none could be restored, empty.
*/
@Override
public Optional<Transaction> restoreBlob(final Transaction transaction) {
Transaction.Builder txBuilder = Transaction.builder();
txBuilder.copiedFrom(transaction);
final BlobsWithCommitments bwc = blobCache.getIfPresent(transaction.getHash());
if (bwc != null) {
txBuilder.blobsWithCommitments(bwc);
return Optional.of(txBuilder.build());
} else {
return Optional.empty();
}
return blobCache.restoreBlob(transaction);
}
}

0 comments on commit 3b67fa5

Please sign in to comment.