Skip to content

Commit

Permalink
write blocks using datastore transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
minhd-vu committed Jun 13, 2023
1 parent 881a90b commit 2e27f72
Showing 1 changed file with 80 additions and 57 deletions.
137 changes: 80 additions & 57 deletions p2p/database/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,49 +113,55 @@ func (d *datastoreWrapper) WriteBlock(ctx context.Context, peer *enode.Node, blo
d.writeEvent(peer, blockEventsKind, block.Hash(), blocksKind)

key := datastore.NameKey(blocksKind, block.Hash().Hex(), nil)
var dsBlock DatastoreBlock
// Fetch the block. We don't check the error because if some of the fields
// are nil we will just set them.
_ = d.client.Get(ctx, key, &dsBlock)

shouldWrite := false
_, err := d.client.RunInTransaction(ctx, func(tx *datastore.Transaction) error {
var dsBlock DatastoreBlock
// Fetch the block. We don't check the error because if some of the fields
// are nil we will just set them.
_ = tx.Get(key, &dsBlock)

if dsBlock.DatastoreHeader == nil {
shouldWrite = true
dsBlock.DatastoreHeader = newDatastoreHeader(block.Header())
}
shouldWrite := false

if len(dsBlock.TotalDifficulty) == 0 {
shouldWrite = true
dsBlock.TotalDifficulty = td.String()
}
if dsBlock.DatastoreHeader == nil {
shouldWrite = true
dsBlock.DatastoreHeader = newDatastoreHeader(block.Header())
}

if dsBlock.Transactions == nil && len(block.Transactions()) > 0 {
shouldWrite = true
if d.shouldWriteTransactions {
d.writeTransactions(ctx, block.Transactions())
if len(dsBlock.TotalDifficulty) == 0 {
shouldWrite = true
dsBlock.TotalDifficulty = td.String()
}

dsBlock.Transactions = make([]*datastore.Key, 0, len(block.Transactions()))
for _, tx := range block.Transactions() {
dsBlock.Transactions = append(dsBlock.Transactions, datastore.NameKey(transactionsKind, tx.Hash().Hex(), nil))
if dsBlock.Transactions == nil && len(block.Transactions()) > 0 {
shouldWrite = true
if d.shouldWriteTransactions {
d.writeTransactions(ctx, block.Transactions())
}

dsBlock.Transactions = make([]*datastore.Key, 0, len(block.Transactions()))
for _, tx := range block.Transactions() {
dsBlock.Transactions = append(dsBlock.Transactions, datastore.NameKey(transactionsKind, tx.Hash().Hex(), nil))
}
}
}

if dsBlock.Uncles == nil && len(block.Uncles()) > 0 {
shouldWrite = true
dsBlock.Uncles = make([]*datastore.Key, 0, len(block.Uncles()))
for _, uncle := range block.Uncles() {
d.writeBlockHeader(ctx, uncle)
dsBlock.Uncles = append(dsBlock.Uncles, datastore.NameKey(blocksKind, uncle.Hash().Hex(), nil))
if dsBlock.Uncles == nil && len(block.Uncles()) > 0 {
shouldWrite = true
dsBlock.Uncles = make([]*datastore.Key, 0, len(block.Uncles()))
for _, uncle := range block.Uncles() {
d.writeBlockHeader(ctx, uncle)
dsBlock.Uncles = append(dsBlock.Uncles, datastore.NameKey(blocksKind, uncle.Hash().Hex(), nil))
}
}
}

if !shouldWrite {
return
}
if shouldWrite {
_, err := tx.Put(key, &dsBlock)
return err
}

return nil
})

if _, err := d.client.Put(ctx, key, &dsBlock); err != nil {
if err != nil {
log.Error().Err(err).Msg("Failed to write new block")
}
}
Expand All @@ -177,33 +183,46 @@ func (d *datastoreWrapper) WriteBlockHeaders(ctx context.Context, headers []*typ
// don't already exist.
func (d *datastoreWrapper) WriteBlockBody(ctx context.Context, body *eth.BlockBody, hash common.Hash) {
key := datastore.NameKey(blocksKind, hash.Hex(), nil)
var block DatastoreBlock

if err := d.client.Get(ctx, key, &block); err != nil {
log.Debug().Err(err).Str("hash", hash.Hex()).Msg("Failed to fetch block when writing block body")
}
_, err := d.client.RunInTransaction(ctx, func(tx *datastore.Transaction) error {
var block DatastoreBlock
if err := tx.Get(key, &block); err != nil {
log.Debug().Err(err).Str("hash", hash.Hex()).Msg("Failed to fetch block when writing block body")
}

shouldWrite := false

if block.Transactions == nil && len(body.Transactions) > 0 {
if d.shouldWriteTransactions {
d.writeTransactions(ctx, body.Transactions)
if block.Transactions == nil && len(body.Transactions) > 0 {
shouldWrite = true
if d.shouldWriteTransactions {
d.writeTransactions(ctx, body.Transactions)
}

block.Transactions = make([]*datastore.Key, 0, len(body.Transactions))
for _, tx := range body.Transactions {
block.Transactions = append(block.Transactions, datastore.NameKey(transactionsKind, tx.Hash().Hex(), nil))
}
}

block.Transactions = make([]*datastore.Key, 0, len(body.Transactions))
for _, tx := range body.Transactions {
block.Transactions = append(block.Transactions, datastore.NameKey(transactionsKind, tx.Hash().Hex(), nil))
if block.Uncles == nil && len(body.Uncles) > 0 {
shouldWrite = true
block.Uncles = make([]*datastore.Key, 0, len(body.Uncles))
for _, uncle := range body.Uncles {
d.writeBlockHeader(ctx, uncle)
block.Uncles = append(block.Uncles, datastore.NameKey(blocksKind, uncle.Hash().Hex(), nil))
}
}
}

if block.Uncles == nil && len(body.Uncles) > 0 {
block.Uncles = make([]*datastore.Key, 0, len(body.Uncles))
for _, uncle := range body.Uncles {
d.writeBlockHeader(ctx, uncle)
block.Uncles = append(block.Uncles, datastore.NameKey(blocksKind, uncle.Hash().Hex(), nil))
if shouldWrite {
_, err := tx.Put(key, &block)
return err
}
}

if _, err := d.client.Put(ctx, key, &block); err != nil {
log.Error().Err(err).Msg("Failed to write block header")
return nil
})

if err != nil {
log.Error().Err(err).Msg("Failed to write block body")
}
}

Expand Down Expand Up @@ -338,15 +357,19 @@ func (d *datastoreWrapper) writeEvents(ctx context.Context, peer *enode.Node, ev
// exist.
func (d *datastoreWrapper) writeBlockHeader(ctx context.Context, header *types.Header) {
key := datastore.NameKey(blocksKind, header.Hash().Hex(), nil)
var block DatastoreBlock

if err := d.client.Get(ctx, key, &block); err == nil && block.DatastoreHeader != nil {
return
}
_, err := d.client.RunInTransaction(ctx, func(tx *datastore.Transaction) error {
var block DatastoreBlock
if err := tx.Get(key, &block); err == nil && block.DatastoreHeader != nil {
return nil
}

block.DatastoreHeader = newDatastoreHeader(header)
block.DatastoreHeader = newDatastoreHeader(header)
_, err := tx.Put(key, &block)
return err
})

if _, err := d.client.Put(ctx, key, &block); err != nil {
if err != nil {
log.Error().Err(err).Msg("Failed to write block header")
}
}
Expand Down

0 comments on commit 2e27f72

Please sign in to comment.