-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
7 changed files
with
433 additions
and
23 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,146 @@ | ||
package main | ||
|
||
import ( | ||
"container/heap" | ||
"context" | ||
"math" | ||
"math/rand" | ||
"sync" | ||
"time" | ||
) | ||
|
||
type QueueItem struct { | ||
key string | ||
attempts int | ||
nextRetry time.Time | ||
} | ||
|
||
type Queue struct { | ||
mu sync.Mutex | ||
cond *sync.Cond | ||
items map[string]*QueueItem | ||
heap *priorityQueue | ||
} | ||
|
||
func NewQueue() *Queue { | ||
q := &Queue{ | ||
items: make(map[string]*QueueItem), | ||
heap: &priorityQueue{}, | ||
} | ||
heap.Init(q.heap) | ||
q.cond = sync.NewCond(&q.mu) | ||
return q | ||
} | ||
|
||
func (q *Queue) Run(ctx context.Context) { | ||
ticker := time.NewTicker(time.Millisecond * 100) | ||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return | ||
case <-ticker.C: | ||
q.mu.Lock() | ||
if q.heap.Len() == 0 { | ||
q.mu.Unlock() | ||
continue | ||
} | ||
|
||
nextItem := (*q.heap)[0] | ||
delta := nextItem.nextRetry.Sub(time.Now()) | ||
if delta > 0 { | ||
ticker.Reset(delta) | ||
q.mu.Unlock() | ||
continue | ||
} | ||
|
||
q.cond.Signal() | ||
q.mu.Unlock() | ||
} | ||
} | ||
} | ||
|
||
func (q *Queue) Add(key string) { | ||
q.mu.Lock() | ||
defer q.mu.Unlock() | ||
if _, exists := q.items[key]; !exists { | ||
item := &QueueItem{key: key, attempts: 0} | ||
q.items[key] = item | ||
heap.Push(q.heap, item) | ||
q.cond.Signal() | ||
} | ||
} | ||
|
||
func (q *Queue) Done(key string) { | ||
q.mu.Lock() | ||
defer q.mu.Unlock() | ||
if item, exists := q.items[key]; exists { | ||
delete(q.items, key) | ||
q.removeFromHeap(item) | ||
} | ||
} | ||
|
||
func (q *Queue) Get() string { | ||
q.mu.Lock() | ||
defer q.mu.Unlock() | ||
for { | ||
if q.heap.Len() == 0 { | ||
q.cond.Wait() | ||
} else { | ||
item := heap.Pop(q.heap).(*QueueItem) | ||
if item.nextRetry.Before(time.Now()) { | ||
delete(q.items, item.key) | ||
return item.key | ||
} | ||
heap.Push(q.heap, item) | ||
q.cond.Wait() | ||
} | ||
} | ||
} | ||
|
||
func (q *Queue) Retry(key string) { | ||
q.mu.Lock() | ||
defer q.mu.Unlock() | ||
if item, exists := q.items[key]; exists { | ||
item.attempts++ | ||
item.nextRetry = time.Now().Add(q.exponentialBackoff(item.attempts)) | ||
heap.Push(q.heap, item) | ||
q.cond.Signal() | ||
} | ||
} | ||
|
||
func (q *Queue) exponentialBackoff(attempts int) time.Duration { | ||
backoff := float64(time.Second) | ||
jitter := backoff * 0.1 | ||
factor := math.Pow(2, float64(attempts)) | ||
return time.Duration(backoff*factor + jitter*factor*0.5*rand.Float64()) | ||
} | ||
|
||
func (q *Queue) removeFromHeap(item *QueueItem) { | ||
for i, heapItem := range *q.heap { | ||
if heapItem == item { | ||
heap.Remove(q.heap, i) | ||
break | ||
} | ||
} | ||
} | ||
|
||
type priorityQueue []*QueueItem | ||
|
||
func (pq priorityQueue) Len() int { return len(pq) } | ||
func (pq priorityQueue) Less(i, j int) bool { | ||
return pq[i].nextRetry.Before(pq[j].nextRetry) | ||
} | ||
func (pq priorityQueue) Swap(i, j int) { | ||
pq[i], pq[j] = pq[j], pq[i] | ||
} | ||
func (pq *priorityQueue) Push(x interface{}) { | ||
item := x.(*QueueItem) | ||
*pq = append(*pq, item) | ||
} | ||
func (pq *priorityQueue) Pop() interface{} { | ||
old := *pq | ||
n := len(old) | ||
item := old[n-1] | ||
*pq = old[0 : n-1] | ||
return item | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
package main | ||
|
||
import ( | ||
"context" | ||
"math" | ||
"sync" | ||
"testing" | ||
"time" | ||
|
||
"github.com/stretchr/testify/assert" | ||
) | ||
|
||
func TestAddingSingleItem(t *testing.T) { | ||
q := NewQueue() | ||
q.Add("item1") | ||
|
||
key := q.Get() | ||
if key != "item1" { | ||
t.Errorf("Expected 'item1', got %s", key) | ||
} | ||
} | ||
|
||
func TestAddMultipleItems(t *testing.T) { | ||
q := NewQueue() | ||
q.Add("item1") | ||
q.Add("item2") | ||
|
||
key1 := q.Get() | ||
key2 := q.Get() | ||
|
||
if key1 == key2 { | ||
t.Errorf("Expected different items, got the same item twice: %s", key1) | ||
} | ||
if (key1 != "item1" && key1 != "item2") || (key2 != "item1" && key2 != "item2") { | ||
t.Errorf("Unexpected items: %s, %s", key1, key2) | ||
} | ||
} | ||
|
||
func TestRetryWithBackoff(t *testing.T) { | ||
q := NewQueue() | ||
go q.Run(context.TODO()) | ||
q.Add("item1") | ||
q.Retry("item1") | ||
|
||
// Get should wait for the retry backoff duration before returning the item. | ||
start := time.Now() | ||
key := q.Get() | ||
elapsed := time.Since(start) | ||
|
||
if key != "item1" { | ||
t.Errorf("Expected 'item1', got %s", key) | ||
} | ||
|
||
expectedBackoff := time.Second * 2 | ||
tolerance := 0.2 | ||
if !approxDuration(elapsed, expectedBackoff, tolerance) { | ||
t.Errorf("Expected retry backoff around %v, got %v", expectedBackoff, elapsed) | ||
} | ||
} | ||
|
||
func TestItemUniqueConstraint(t *testing.T) { | ||
q := NewQueue() | ||
q.Add("item1") | ||
q.Add("item1") // This should be ignored | ||
assert.Len(t, q.items, 1) | ||
} | ||
|
||
func TestConcurrentAddAndRetrieve(t *testing.T) { | ||
q := NewQueue() | ||
var wg sync.WaitGroup | ||
keys := []string{"item1", "item2", "item3"} | ||
|
||
for _, key := range keys { | ||
wg.Add(1) | ||
go func(key string) { | ||
defer wg.Done() | ||
q.Add(key) | ||
}(key) | ||
} | ||
|
||
wg.Wait() | ||
|
||
for i := 0; i < len(keys); i++ { | ||
key := q.Get() | ||
if key != "item1" && key != "item2" && key != "item3" { | ||
t.Errorf("Unexpected key retrieved: %s", key) | ||
} | ||
} | ||
} | ||
|
||
func TestDoneFunctionality(t *testing.T) { | ||
q := NewQueue() | ||
q.Add("item1") | ||
q.Done("item1") | ||
assert.Len(t, q.items, 0) | ||
} | ||
|
||
func TestExponentialBackoffFunction(t *testing.T) { | ||
q := NewQueue() | ||
|
||
backoff := q.exponentialBackoff(1) | ||
if !approxDuration(backoff, time.Second*2, 0.2) { | ||
t.Errorf("Expected backoff around 2s, got %v", backoff) | ||
} | ||
|
||
backoff = q.exponentialBackoff(2) | ||
if !approxDuration(backoff, time.Second*4, 0.2) { | ||
t.Errorf("Expected backoff around 4s, got %v", backoff) | ||
} | ||
} | ||
|
||
func approxDuration(d1, d2 time.Duration, tolerance float64) bool { | ||
return math.Abs(float64(d1-d2)) <= tolerance*float64(d1) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.