-
Notifications
You must be signed in to change notification settings - Fork 25
/
pool.go
100 lines (83 loc) · 2.14 KB
/
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package wapc
import (
"context"
"errors"
"fmt"
"time"
"github.com/Workiva/go-datastructures/queue"
)
type (
// Pool is a wrapper around a ringbuffer of WASM modules
Pool struct {
rb *queue.RingBuffer
module Module
instances []Instance
}
InstanceInitialize func(instance Instance) error
)
// NewPool takes in compiled WASM module and a size and returns a pool
// containing `size` instances of that module.
func NewPool(ctx context.Context, module Module, size uint64, initializer ...InstanceInitialize) (*Pool, error) {
var initialize InstanceInitialize
if len(initializer) > 0 {
initialize = initializer[0]
}
rb := queue.NewRingBuffer(size)
instances := make([]Instance, size)
for i := uint64(0); i < size; i++ {
inst, err := module.Instantiate(ctx)
if err != nil {
return nil, err
}
if initialize != nil {
if err = initialize(inst); err != nil {
return nil, fmt.Errorf("could not initialize instance: %w", err)
}
}
ok, err := rb.Offer(inst)
if err != nil {
return nil, err
}
if !ok {
return nil, fmt.Errorf("could not add module %d to module pool of size %d", i, size)
}
instances[i] = inst
}
return &Pool{
rb: rb,
module: module,
instances: instances,
}, nil
}
// Get returns a module from the pool if it can be retrieved
// within the passed timeout window, if not it returns an error
func (p *Pool) Get(timeout time.Duration) (Instance, error) {
instanceIface, err := p.rb.Poll(timeout)
if err != nil {
return nil, fmt.Errorf("get from pool timed out: %w", err)
}
inst, ok := instanceIface.(Instance)
if !ok {
return nil, errors.New("item retrieved from pool is not an instance")
}
return inst, nil
}
// Return takes a module and adds it to the pool
// This should only be called using a module
func (p *Pool) Return(inst Instance) error {
ok, err := p.rb.Offer(inst)
if err != nil {
return err
}
if !ok {
return errors.New("cannot return instance to full pool")
}
return nil
}
// Close closes down all the instances contained by the pool.
func (p *Pool) Close(ctx context.Context) {
p.rb.Dispose()
for _, inst := range p.instances {
inst.Close(ctx)
}
}