-
Notifications
You must be signed in to change notification settings - Fork 33
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
cost control: dedupe writeEvents #422
Conversation
Hmm, we may not need this concept of sharding. Take a look at this: type DatastoreEventKey struct {
SensorId string
PeerID string
Hash string
}
// writeEvent writes either a block or transaction event to datastore depending
// on the provided eventKind and hashKind.
func (d *Datastore) writeEvent(ctx context.Context, peer *enode.Node, eventKind string, hash common.Hash, hashKind string, tfs time.Time) {
dek := DatastoreEventKey{
SensorId: d.sensorID,
PeerID: peer.URLv4(),
Hash: hash.Hex(),
}
data, err := json.Marshal(dek)
if err != nil {
log.Error().Err(err).Any("datastore_event_key", dek).Msg("Failed to marshal datastore event key")
return
}
checksum := sha256.Sum256(data)
// We generate the name key from the hash of the sensor ID, peer ID, and
// transaction hash.
key := datastore.NameKey(BlockEventsKind, hex.EncodeToString(checksum[:]), nil)
event := DatastoreEvent{
SensorId: d.sensorID,
PeerId: peer.URLv4(),
Hash: datastore.NameKey(hashKind, hash.Hex(), nil),
Time: tfs,
TTL: tfs.Add(d.ttl),
}
_, err = d.client.RunInTransaction(ctx, func(tx *datastore.Transaction) error {
var de DatastoreEvent
// Check if the event already exists.
if err := tx.Get(key, &de); err == nil {
// Skip writing if event already exists.
return nil
}
_, err := tx.Put(key, &event)
return err
})
if err != nil {
log.Error().Err(err).Msgf("Failed to write to %v", eventKind)
}
} This might be a simpler solution, it just checks datastore if the event already exists, if it doesn't exist, then we write the event. This would increase the number of reads, but would decrease the number of writes. I think your solution may actually be more optimal since, we don't need to do reads at all, but I'm unsure of the memory footprint. |
p2p/database/datastore.go
Outdated
mu.Lock() | ||
for key, timestamp := range shard { | ||
if timestamp.Before(expiryTime) { | ||
delete(shard, key) // Remove stale entry |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The thing I encountered before is that delete
on maps doesn't actually decrease the size of the map (but it does clear the key). It retains the maximum size that it was, which caused some memory issues in earlier sensor iteraitons.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
interesting, thanks for pointing out, will reexamine
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
adjusted per your feedback
Thanks @minhd-vu for the suggestion—it's definitely a valuable approach worth considering. However, given the high concurrency and low-latency requirements here, I believe in-memory deduplication is more suitable. By avoiding additional reads to check for duplicates in Datastore, we eliminate potential latency bottlenecks and reduce dependency on external calls, which is critical for our ingestion to remain fast under heavy load. Additionally, frequent reads would likely increase our data egress costs from Datastore to sensor nodes, impacting our overall efficiency and budget. While the in-memory approach has a higher memory footprint, we’ll be careful with memory cleanup logic to keep resource usage efficient and avoid potential issues |
@rebelArtists make sense dan, for an in-memory solution i would propose storing a linked list in the // conn represents an individual connection with a peer.
type conn struct {
sensorID string
node *enode.Node
logger zerolog.Logger
rw ethp2p.MsgReadWriter
db database.Database
head *HeadBlock
headMutex *sync.RWMutex
counter *prometheus.CounterVec
name string
// requests is used to store the request ID and the block hash. This is used
// when fetching block bodies because the eth protocol block bodies do not
// contain information about the block hash.
requests *list.List
requestNum uint64
+ blockHashes *list.List
// oldestBlock stores the first block the sensor has seen so when fetching
// parent blocks, it does not request blocks older than this.
oldestBlock *types.Header
} Let me know your thoughts |
it's a good thought, and would keep mem requirements lower. i will explore this direction today |
@minhd-vu new draft pr w/ 1st stab at linked list implementation: https://github.com/0xPolygon/polygon-cli/pull/423/files curious your thoughts when you get a chance |
Closing this in favor of #423 |
Description
Testing
smoke tests locally, no go errors on run
unclear how to test event writing locally