forked from viccon/sturdyc
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcache.go
308 lines (277 loc) · 8.89 KB
/
cache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
package sturdyc
import (
"context"
"log/slog"
"sync"
"time"
xxhash "github.com/cespare/xxhash/v2"
)
// FetchFn Fetch represents a function that can be used to fetch a single record from a data source.
type FetchFn[T any] func(ctx context.Context) (T, error)
// BatchFetchFn represents a function that can be used to fetch multiple records from a data source.
type BatchFetchFn[T any] func(ctx context.Context, ids []string) (map[string]T, error)
type BatchResponse[T any] map[string]T
// KeyFn is called invoked for each record that a batch fetch
// operation returns. It is used to create unique cache keys.
type KeyFn func(id string) string
// Config represents the configuration that can be applied to the cache.
type Config struct {
clock Clock
evictionInterval time.Duration
disableContinuousEvictions bool
metricsRecorder DistributedMetricsRecorder
log Logger
refreshInBackground bool
minRefreshTime time.Duration
maxRefreshTime time.Duration
retryBaseDelay time.Duration
storeMissingRecords bool
bufferRefreshes bool
batchMutex sync.Mutex
bufferSize int
bufferTimeout time.Duration
permutationBufferMap map[string]*buffer
useRelativeTimeKeyFormat bool
keyTruncation time.Duration
getSize func() int
distributedStorage DistributedStorageWithDeletions
distributedEarlyRefreshes bool
distributedRefreshAfterDuration time.Duration
}
// Client represents a cache client that can be used to store and retrieve values.
type Client[T any] struct {
*Config
shards []*shard[T]
nextShard int
inFlightMutex sync.Mutex
inFlightBatchMutex sync.Mutex
inFlightMap map[string]*inFlightCall[T]
inFlightBatchMap map[string]*inFlightCall[map[string]T]
}
// New creates a new Client instance with the specified configuration.
//
// `capacity` defines the maximum number of entries that the cache can store.
// `numShards` Is used to set the number of shards. Has to be greater than 0.
// `ttl` Sets the time to live for each entry in the cache. Has to be greater than 0.
// `evictionPercentage` Percentage of items to evict when the cache exceeds its capacity.
// `opts` allows for additional configurations to be applied to the cache client.
func New[T any](capacity, numShards int, ttl time.Duration, evictionPercentage int, opts ...Option) *Client[T] {
client := &Client[T]{
inFlightMap: make(map[string]*inFlightCall[T]),
inFlightBatchMap: make(map[string]*inFlightCall[map[string]T]),
}
// Create a default configuration, and then apply the options.
cfg := &Config{
clock: NewClock(),
evictionInterval: ttl / time.Duration(numShards),
getSize: client.Size,
log: slog.Default(),
}
// Apply the options to the configuration.
client.Config = cfg
for _, opt := range opts {
opt(cfg)
}
validateConfig(capacity, numShards, ttl, evictionPercentage, cfg)
shardSize := capacity / numShards
shards := make([]*shard[T], numShards)
for i := 0; i < numShards; i++ {
shards[i] = newShard[T](shardSize, ttl, evictionPercentage, cfg)
}
client.shards = shards
client.nextShard = 0
// Run evictions on the shards in a separate goroutine.
if !cfg.disableContinuousEvictions {
client.performContinuousEvictions()
}
return client
}
// performContinuousEvictions is going to be running in a separate goroutine that we're going to prevent from ever exiting.
func (c *Client[T]) performContinuousEvictions() {
go func() {
ticker, stop := c.clock.NewTicker(c.evictionInterval)
defer stop()
for range ticker {
c.shards[c.nextShard].evictExpired()
c.nextShard = (c.nextShard + 1) % len(c.shards)
}
}()
}
// getShard returns the shard that should be used for the specified key.
func (c *Client[T]) getShard(key string) *shard[T] {
hash := xxhash.Sum64String(key)
shardIndex := hash % uint64(len(c.shards))
c.reportShardIndex(int(shardIndex))
return c.shards[shardIndex]
}
// getWithState retrieves a single value from the cache and returns additional
// information about the state of the record. The state includes whether the record
// exists, if it has been marked as missing, and if it is due for a refresh.
func (c *Client[T]) getWithState(key string) (value T, exists, markedAsMissing, refresh bool) {
shard := c.getShard(key)
val, exists, markedAsMissing, refresh := shard.get(key)
c.reportCacheHits(exists, markedAsMissing, refresh)
return val, exists, markedAsMissing, refresh
}
// Get retrieves a single value from the cache.
//
// Parameters:
//
// key - The key to be retrieved.
//
// Returns:
//
// The value corresponding to the key and a boolean indicating if the value was found.
func (c *Client[T]) Get(key string) (T, bool) {
shard := c.getShard(key)
val, ok, markedAsMissing, refresh := shard.get(key)
c.reportCacheHits(ok, markedAsMissing, refresh)
return val, ok && !markedAsMissing
}
// GetMany retrieves multiple values from the cache.
//
// Parameters:
//
// keys - The list of keys to be retrieved.
//
// Returns:
//
// A map of keys to their corresponding values.
func (c *Client[T]) GetMany(keys []string) map[string]T {
records := make(map[string]T, len(keys))
for _, key := range keys {
if value, ok := c.Get(key); ok {
records[key] = value
}
}
return records
}
// GetManyKeyFn follows the same API as GetOrFetchBatch and PassthroughBatch.
// You provide it with a slice of IDs and a keyFn, which is applied to create
// the cache key. The returned map uses the IDs as keys instead of the cache
// key. If you've used ScanKeys to retrieve the actual keys, you can retrieve
// the records using GetMany instead.
//
// Parameters:
//
// ids - The list of IDs to be retrieved.
// keyFn - A function that generates the cache key for each ID.
//
// Returns:
//
// A map of IDs to their corresponding values.
func (c *Client[T]) GetManyKeyFn(ids []string, keyFn KeyFn) map[string]T {
records := make(map[string]T, len(ids))
for _, id := range ids {
if value, ok := c.Get(keyFn(id)); ok {
records[id] = value
}
}
return records
}
// Set writes a single value to the cache.
//
// Parameters:
//
// key - The key to be set.
// value - The value to be associated with the key.
//
// Returns:
//
// A boolean indicating if the set operation triggered an eviction.
func (c *Client[T]) Set(key string, value T) bool {
shard := c.getShard(key)
return shard.set(key, value, false)
}
// StoreMissingRecord writes a single value to the cache. Returns true if it triggered an eviction.
func (c *Client[T]) StoreMissingRecord(key string) bool {
shard := c.getShard(key)
var zero T
return shard.set(key, zero, true)
}
// SetMany writes a map of key-value pairs to the cache.
//
// Parameters:
//
// records - A map of keys to values to be set in the cache.
//
// Returns:
//
// A boolean indicating if any of the set operations triggered an eviction.
func (c *Client[T]) SetMany(records map[string]T) bool {
var triggeredEviction bool
for key, value := range records {
evicted := c.Set(key, value)
if evicted {
triggeredEviction = true
}
}
return triggeredEviction
}
// SetManyKeyFn follows the same API as GetOrFetchBatch and PassthroughBatch.
// It takes a map of records where the keyFn is applied to each key in the map
// before it's stored in the cache.
//
// Parameters:
//
// records - A map of IDs to values to be set in the cache.
// cacheKeyFn - A function that generates the cache key for each ID.
//
// Returns:
//
// A boolean indicating if any of the set operations triggered an eviction.
func (c *Client[T]) SetManyKeyFn(records map[string]T, cacheKeyFn KeyFn) bool {
var triggeredEviction bool
for id, value := range records {
evicted := c.Set(cacheKeyFn(id), value)
if evicted {
triggeredEviction = true
}
}
return triggeredEviction
}
// ScanKeys returns a list of all keys in the cache.
//
// Returns:
//
// A slice of strings representing all the keys in the cache.
func (c *Client[T]) ScanKeys() []string {
keys := make([]string, 0, c.Size())
for _, shard := range c.shards {
keys = append(keys, shard.keys()...)
}
return keys
}
// Size returns the number of entries in the cache.
//
// Returns:
//
// An integer representing the total number of entries in the cache.
func (c *Client[T]) Size() int {
var sum int
for _, shard := range c.shards {
sum += shard.size()
}
return sum
}
// Delete removes a single entry from the cache.
//
// Parameters:
//
// key: The key of the entry to be removed.
func (c *Client[T]) Delete(key string) {
shard := c.getShard(key)
shard.delete(key)
}
// NumKeysInflight returns the number of keys that are currently being fetched.
//
// Returns:
//
// An integer representing the total number of keys that are currently being fetched.
func (c *Client[T]) NumKeysInflight() int {
c.inFlightMutex.Lock()
defer c.inFlightMutex.Unlock()
c.inFlightBatchMutex.Lock()
defer c.inFlightBatchMutex.Unlock()
return len(c.inFlightMap) + len(c.inFlightBatchMap)
}