diff --git a/p2p/database/datastore.go b/p2p/database/datastore.go index 51d7141c..2496c2ce 100644 --- a/p2p/database/datastore.go +++ b/p2p/database/datastore.go @@ -113,49 +113,55 @@ func (d *datastoreWrapper) WriteBlock(ctx context.Context, peer *enode.Node, blo d.writeEvent(peer, blockEventsKind, block.Hash(), blocksKind) key := datastore.NameKey(blocksKind, block.Hash().Hex(), nil) - var dsBlock DatastoreBlock - // Fetch the block. We don't check the error because if some of the fields - // are nil we will just set them. - _ = d.client.Get(ctx, key, &dsBlock) - shouldWrite := false + _, err := d.client.RunInTransaction(ctx, func(tx *datastore.Transaction) error { + var dsBlock DatastoreBlock + // Fetch the block. We don't check the error because if some of the fields + // are nil we will just set them. + _ = tx.Get(key, &dsBlock) - if dsBlock.DatastoreHeader == nil { - shouldWrite = true - dsBlock.DatastoreHeader = newDatastoreHeader(block.Header()) - } + shouldWrite := false - if len(dsBlock.TotalDifficulty) == 0 { - shouldWrite = true - dsBlock.TotalDifficulty = td.String() - } + if dsBlock.DatastoreHeader == nil { + shouldWrite = true + dsBlock.DatastoreHeader = newDatastoreHeader(block.Header()) + } - if dsBlock.Transactions == nil && len(block.Transactions()) > 0 { - shouldWrite = true - if d.shouldWriteTransactions { - d.writeTransactions(ctx, block.Transactions()) + if len(dsBlock.TotalDifficulty) == 0 { + shouldWrite = true + dsBlock.TotalDifficulty = td.String() } - dsBlock.Transactions = make([]*datastore.Key, 0, len(block.Transactions())) - for _, tx := range block.Transactions() { - dsBlock.Transactions = append(dsBlock.Transactions, datastore.NameKey(transactionsKind, tx.Hash().Hex(), nil)) + if dsBlock.Transactions == nil && len(block.Transactions()) > 0 { + shouldWrite = true + if d.shouldWriteTransactions { + d.writeTransactions(ctx, block.Transactions()) + } + + dsBlock.Transactions = make([]*datastore.Key, 0, len(block.Transactions())) + for _, tx := range block.Transactions() { + dsBlock.Transactions = append(dsBlock.Transactions, datastore.NameKey(transactionsKind, tx.Hash().Hex(), nil)) + } } - } - if dsBlock.Uncles == nil && len(block.Uncles()) > 0 { - shouldWrite = true - dsBlock.Uncles = make([]*datastore.Key, 0, len(block.Uncles())) - for _, uncle := range block.Uncles() { - d.writeBlockHeader(ctx, uncle) - dsBlock.Uncles = append(dsBlock.Uncles, datastore.NameKey(blocksKind, uncle.Hash().Hex(), nil)) + if dsBlock.Uncles == nil && len(block.Uncles()) > 0 { + shouldWrite = true + dsBlock.Uncles = make([]*datastore.Key, 0, len(block.Uncles())) + for _, uncle := range block.Uncles() { + d.writeBlockHeader(ctx, uncle) + dsBlock.Uncles = append(dsBlock.Uncles, datastore.NameKey(blocksKind, uncle.Hash().Hex(), nil)) + } } - } - if !shouldWrite { - return - } + if shouldWrite { + _, err := tx.Put(key, &dsBlock) + return err + } + + return nil + }) - if _, err := d.client.Put(ctx, key, &dsBlock); err != nil { + if err != nil { log.Error().Err(err).Msg("Failed to write new block") } } @@ -177,33 +183,46 @@ func (d *datastoreWrapper) WriteBlockHeaders(ctx context.Context, headers []*typ // don't already exist. func (d *datastoreWrapper) WriteBlockBody(ctx context.Context, body *eth.BlockBody, hash common.Hash) { key := datastore.NameKey(blocksKind, hash.Hex(), nil) - var block DatastoreBlock - if err := d.client.Get(ctx, key, &block); err != nil { - log.Debug().Err(err).Str("hash", hash.Hex()).Msg("Failed to fetch block when writing block body") - } + _, err := d.client.RunInTransaction(ctx, func(tx *datastore.Transaction) error { + var block DatastoreBlock + if err := tx.Get(key, &block); err != nil { + log.Debug().Err(err).Str("hash", hash.Hex()).Msg("Failed to fetch block when writing block body") + } + + shouldWrite := false - if block.Transactions == nil && len(body.Transactions) > 0 { - if d.shouldWriteTransactions { - d.writeTransactions(ctx, body.Transactions) + if block.Transactions == nil && len(body.Transactions) > 0 { + shouldWrite = true + if d.shouldWriteTransactions { + d.writeTransactions(ctx, body.Transactions) + } + + block.Transactions = make([]*datastore.Key, 0, len(body.Transactions)) + for _, tx := range body.Transactions { + block.Transactions = append(block.Transactions, datastore.NameKey(transactionsKind, tx.Hash().Hex(), nil)) + } } - block.Transactions = make([]*datastore.Key, 0, len(body.Transactions)) - for _, tx := range body.Transactions { - block.Transactions = append(block.Transactions, datastore.NameKey(transactionsKind, tx.Hash().Hex(), nil)) + if block.Uncles == nil && len(body.Uncles) > 0 { + shouldWrite = true + block.Uncles = make([]*datastore.Key, 0, len(body.Uncles)) + for _, uncle := range body.Uncles { + d.writeBlockHeader(ctx, uncle) + block.Uncles = append(block.Uncles, datastore.NameKey(blocksKind, uncle.Hash().Hex(), nil)) + } } - } - if block.Uncles == nil && len(body.Uncles) > 0 { - block.Uncles = make([]*datastore.Key, 0, len(body.Uncles)) - for _, uncle := range body.Uncles { - d.writeBlockHeader(ctx, uncle) - block.Uncles = append(block.Uncles, datastore.NameKey(blocksKind, uncle.Hash().Hex(), nil)) + if shouldWrite { + _, err := tx.Put(key, &block) + return err } - } - if _, err := d.client.Put(ctx, key, &block); err != nil { - log.Error().Err(err).Msg("Failed to write block header") + return nil + }) + + if err != nil { + log.Error().Err(err).Msg("Failed to write block body") } } @@ -338,15 +357,19 @@ func (d *datastoreWrapper) writeEvents(ctx context.Context, peer *enode.Node, ev // exist. func (d *datastoreWrapper) writeBlockHeader(ctx context.Context, header *types.Header) { key := datastore.NameKey(blocksKind, header.Hash().Hex(), nil) - var block DatastoreBlock - if err := d.client.Get(ctx, key, &block); err == nil && block.DatastoreHeader != nil { - return - } + _, err := d.client.RunInTransaction(ctx, func(tx *datastore.Transaction) error { + var block DatastoreBlock + if err := tx.Get(key, &block); err == nil && block.DatastoreHeader != nil { + return nil + } - block.DatastoreHeader = newDatastoreHeader(header) + block.DatastoreHeader = newDatastoreHeader(header) + _, err := tx.Put(key, &block) + return err + }) - if _, err := d.client.Put(ctx, key, &block); err != nil { + if err != nil { log.Error().Err(err).Msg("Failed to write block header") } }