Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cost control: dedupe writeEvents #422

Closed
wants to merge 5 commits into from
Closed
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 95 additions & 5 deletions p2p/database/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package database
import (
"context"
"fmt"
"hash/fnv"
"math/big"
"sync"
"time"

"cloud.google.com/go/datastore"
Expand Down Expand Up @@ -127,6 +129,76 @@ type DatastoreOptions struct {
TTL time.Duration
}

const (
shardCount = 32
cleanupInterval = 10 * time.Minute // Interval between cleanups
entryTTL = 1 * time.Hour // TTL for each entry
)

var (
dedupMaps [shardCount]map[string]time.Time
mutexes [shardCount]sync.Mutex
)

// Initialize dedupMaps
func init() {
// Initialize each shard in dedupMaps
for i := range dedupMaps {
dedupMaps[i] = make(map[string]time.Time)
}
// Start the cleanup routine to periodically remove stale entries
StartCleanupRoutine()
}

// getShard retrieves the shard and mutex for a given key
func getShard(key string) (map[string]time.Time, *sync.Mutex) {
hash := fnv.New32a()
hash.Write([]byte(key))
shardIndex := hash.Sum32() % shardCount
return dedupMaps[shardIndex], &mutexes[shardIndex]
}

// Deduplication function that checks for uniqueness and writes if unique
func (d *Datastore) DeduplicateAndWriteEvent(peer *enode.Node, eventKind string, hash common.Hash, hashKind string, tfs time.Time) {
key := fmt.Sprintf("%s_%s_%s", d.sensorID, peer.URLv4(), hash.Hex())
shard, mu := getShard(key)

mu.Lock()
defer mu.Unlock()

if existingTime, found := shard[key]; found && !tfs.Before(existingTime) {
return // Skip if not the earliest
}
shard[key] = tfs
d.writeEvent(peer, eventKind, hash, hashKind, tfs)
}

// Cleanup function that removes expired entries from each shard
func cleanupDedupMap() {
for {
time.Sleep(cleanupInterval) // Wait for the next cleanup cycle

expiryTime := time.Now().Add(-entryTTL)
for i := 0; i < shardCount; i++ {
mu := &mutexes[i]
shard := dedupMaps[i]

mu.Lock()
for key, timestamp := range shard {
if timestamp.Before(expiryTime) {
delete(shard, key) // Remove stale entry
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thing I encountered before is that delete on maps doesn't actually decrease the size of the map (but it does clear the key). It retains the maximum size that it was, which caused some memory issues in earlier sensor iteraitons.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting, thanks for pointing out, will reexamine

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

adjusted per your feedback

}
}
mu.Unlock()
}
}
}

// StartCleanupRoutine starts the cleanup routine as a goroutine
func StartCleanupRoutine() {
go cleanupDedupMap()
}

// NewDatastore connects to datastore and creates the client. This should
// only be called once unless trying to write to different databases.
func NewDatastore(ctx context.Context, opts DatastoreOptions) Database {
Expand Down Expand Up @@ -158,7 +230,7 @@ func (d *Datastore) WriteBlock(ctx context.Context, peer *enode.Node, block *typ
if d.ShouldWriteBlockEvents() {
d.jobs <- struct{}{}
go func() {
d.writeEvent(peer, BlockEventsKind, block.Hash(), BlocksKind, tfs)
d.DeduplicateAndWriteEvent(peer, BlockEventsKind, block.Hash(), BlocksKind, tfs)
<-d.jobs
}()
}
Expand Down Expand Up @@ -453,22 +525,40 @@ func (d *Datastore) writeEvent(peer *enode.Node, eventKind string, hash common.H
// on the provided eventKind and hashKind. This is similar to writeEvent but
// batches the request.
func (d *Datastore) writeEvents(ctx context.Context, peer *enode.Node, eventKind string, hashes []common.Hash, hashKind string, tfs time.Time) {
// Create slices to hold only deduplicated keys and events
keys := make([]*datastore.Key, 0, len(hashes))
events := make([]*DatastoreEvent, 0, len(hashes))

for _, hash := range hashes {
keys = append(keys, datastore.IncompleteKey(eventKind, nil))
// Generate deduplication key
key := fmt.Sprintf("%s_%s_%s", d.sensorID, peer.URLv4(), hash.Hex())

// Determine the shard for this key and retrieve the associated map and mutex
shardMap, shardMutex := getShard(key)

event := DatastoreEvent{
// Lock the specific shard mutex for thread-safe access
shardMutex.Lock()
if existingTime, found := shardMap[key]; found && !tfs.Before(existingTime) {
// If found and not the earliest, skip
shardMutex.Unlock()
continue
}
// Either add a new event or update to an earlier time in the specific shard
shardMap[key] = tfs
shardMutex.Unlock()

// Proceed with writing to the datastore
keys = append(keys, datastore.IncompleteKey(eventKind, nil))
events = append(events, &DatastoreEvent{
SensorId: d.sensorID,
PeerId: peer.URLv4(),
Hash: datastore.NameKey(hashKind, hash.Hex(), nil),
Time: tfs,
TTL: tfs.Add(d.ttl),
}
events = append(events, &event)
})
}

// Perform batch write for deduplicated events only
if _, err := d.client.PutMulti(ctx, keys, events); err != nil {
log.Error().Err(err).Msgf("Failed to write to %v", eventKind)
}
Expand Down
Loading