Skip to content

Commit

Permalink
the first commit
Browse files Browse the repository at this point in the history
  • Loading branch information
RonlGett committed Sep 10, 2019
0 parents commit 5782b21
Show file tree
Hide file tree
Showing 24 changed files with 2,353 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
.idea/*
/vendor
log
121 changes: 121 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
# **redis-semaphore**

<br> Implements a semaphore using redis commands. The semaphore is blocking, not polling, and has a fair queue serving processes on a first-come, first-serve basis. </br>
<br> Implementation based on Redis BLPOP ability to block execution until queue is not empty or timeout reached. </br>

### **Redis Client**

redis-semaphore requires redis client provided by the user.
<br> It is not dependant on specific redis version, and can accept any implementation that satisfies its Redis interface. </br>
<br> Implementations of `go_redis` && `redis.v5` clients are already given in this repository for your convenience. </br>
<br> Providing nil object will result in validation error. </br>

### **Redis Keys**

redis-semaphore uses 4 keys to maintain semaphore lifecycle:

1. **name** - derived by the binding key given by user. Semaphores are separated in Redis by their names

2. **version** - In case of future possible updates & fixes, version key enables to differentiating between old and updated clients

3. **available resources queue name** - represents the queue name in redis holding list of free locks to use

4. **locked resources set name** - represents key in redis which under it all used locks and their expiration time will be stored

### **Num Connections**

due to the blocking nature of `blpop` command, note that it's very important to set size of redis connections pool that is higher
than number of expected concurrent locks at worst case.
<br> Exhausting all redis connections will result in a deadlock. </br>

### **Options**

#### **Logging**

redis-semaphore provides logging mechanism to enable monitoring in case needed.
<br> It is not dependant on specific log tool, and can accept any implementation that satisfies its Logger interface. </br>
<br> Note that Logger interface should support 3 types of log levels:
<ul>
<li> a. `Error` (0) - show only non blocking errors (errors that will not terminate semaphore process) </li>
<li> b. `Info` (1) - log only critical information (lock/unlock succeeded/failed, etc) </li>
<li> c. `Debug` (2) - verbose, include internal steps </li>
</ul>
<br> Implementation of `logrus` client is already given in this repository for your convenience. </br>
<br> logger is optional. In case user have no need for log, do not pass it in options </br>

#### **Settings**

The semaphore uses 4 settings to determine it's behavior, each of them can be overridden:

1. **`Expiration`** - redis-semaphore must have an expiration time to ensure that after a while all evidence of the semaphore will disappear and your redis server will not be cluttered with unused keys.
Also, it represents the maximum amount of time mutual exclusion is guaranteed. Value is set to 1 minute by default.

1. **`TryLockTimeout`** - each lock operation must be bounded by max running time and cannot block execution indefinitely. value is set to 30 seconds by default. This setting can be overridden to any duration between 1 second and semaphore expiration time.

2. **`MaxParallelResources`** - redis-semaphore allows to define a set number of processes inside the semaphore-protected block (1 by default). All those processes can run in the critical section simultaneously.

3. **`LockAttempts`** - user can choose to retry acquiring lock if timeout reached. All attempts will have the same timeout. Number of attempts is 1 be default (no retries).

### **Usage**

#### **Creating New Semaphore**

```
bindingKey = "my_lock_key"
redisClient := semaphoreredis.NewRedisV5Client(redis.NewClient(&redis.Options{Addr: "localhost:6379"}))
logger := semaphorelogger.NewLogrusLogger(logrus.New(), semaphorelogger.LogLevelInfo, bindingKey)
overrideSettings := semaphore.Settings{
TryLockTimeout: 20 * time.Second,
LockAttempts: 2,
MaxParallelResources: 1,
}
s, err := semaphore.New(bindingKey, redisClient, logger, overrideSettings)
```

<br> Creates a new Semaphore. Mandatory params are binding key and Redis client. Optional params are logger and overrides to the default settings. Validation error will be returned on invalid params. </br>
<br> After semaphore is created, its settings cannot be modified. If you wish to alter semaphore setting, it would require creating and new object.
Note that creating multiple semaphores with the same binding key but different `MaxParallelResources` setting will have no effect. The setting of the first semaphore that will acquire lock will be applied until this semaphore will be expired. </br>

#### **Lock & Unlock**

```
token, err := s.Lock()
isLockUsed, err := s.IsResourceLocked(token) //isLockUsed = true
numFreeLocks, err := s.GetNumAvailableResources() //numFreeLocks = MaxParallelResources - 1
err := s.Unlock(token) //don't forget this!
isLockUsed, err := s.IsResourceLocked(token) //isLockUsed = false
numFreeLocks, err := s.GetNumAvailableResources() //numFreeLocks = MaxParallelResources
```

<br> redis-semaphore enables separate lock & unlock operations. </br>
<br> Performing lock operation on the Semaphore creates all it's keys in redis if used for the first time or expired, and checks for expired locks otherwise (see expired resources section). </br>
<br> Lock function returns unique uuid representing the acquired lock. This string should be given as parameter to unlock function when we want to release the lock. </br>
<br> Resource will be locked until will be freed by unlocking it, or until semaphore will expire. </br>
<br> Performing lock or unlock oprations resets the semaphore's expiration time. </br>

#### **Execute With Mutex**
```
WithMutex(lockByKey string, redisClient Redis, logger Logger, safeCode func(), settings ...Settings) error
```
Wrapper for encapsulating semaphore internal implementation. Mandatory params are binding key, Redis client and block of code to run. Optional params are logger and settings overrides.
<br> Function will create new semaphore, acquire lock, run function in critical section, and then release lock.
If error occurred while running code block, unlock procedure will run all the same. </br>

#### **Custom Timeout**
```
token, err := s.LockWithCustomTimeout(5 * time.Second)
```
User can choose to acquire lock using the same Semaphore but with alternating timeout for each lock operation. The custom timeout is subjects to the same limitations as `TryLockTimeout` parameter.
Providing invalid timeout will result in validation error.

### **Expired Resources**

There are possible cases where non expired Semaphore will contain locks that passed their expiration time.
The main reason for that is the extension of the Semaphore's expiration upon lock & unlock operations.
Before every lock operation, expired resources (if exists) will be cleaned up and returned to available locks queue.

### **Trying Lock On Expired Semaphore**

Note that as opposed to locking algorithms that uses polling, in case semaphore expires while process awaits in the queue, it will be not possible to acquire lock!
Client will have to wait until timeout will be reached and then he will be able to lock successfully at the next attempt.
15 changes: 15 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
module github.com/gtforge/redis-semaphore-go

go 1.12

require (
github.com/go-redis/redis v6.15.2+incompatible
github.com/golang/mock v1.3.1
github.com/onsi/ginkgo v1.9.0
github.com/onsi/gomega v1.6.0
github.com/pkg/errors v0.8.1
github.com/satori/go.uuid v1.2.0
github.com/sirupsen/logrus v1.4.2
github.com/tylerb/gls v0.0.0-20150407001822-e606233f194d
gopkg.in/redis.v5 v5.2.9
)
49 changes: 49 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/go-redis/redis v6.15.2+incompatible h1:9SpNVG76gr6InJGxoZ6IuuxaCOQwDAhzyXg+Bs+0Sb4=
github.com/go-redis/redis v6.15.2+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
github.com/golang/mock v1.3.1 h1:qGJ6qTW+x6xX/my+8YUVl4WNpX9B7+/l2tRsHGZ7f2s=
github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.9.0 h1:SZjF721BByVj8QH636/8S2DnX4n0Re3SteMmw3N+tzc=
github.com/onsi/ginkgo v1.9.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.6.0 h1:8XTW0fcJZEq9q+Upcyws4JSGua2MFysCL5xkaSgHc+M=
github.com/onsi/gomega v1.6.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/tylerb/gls v0.0.0-20150407001822-e606233f194d h1:yYYPFFlbqxF5mrj5sEfETtM/Ssz2LTy0/VKlDdXYctc=
github.com/tylerb/gls v0.0.0-20150407001822-e606233f194d/go.mod h1:0MwyId/pXK5wkYYEXe7NnVknX+aNBuF73fLV3U0reU8=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a h1:oWX7TPOiFAMXLq8o0ikBYfCJVlRHBcsciT5bXOrH628=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/redis.v5 v5.2.9 h1:MNZYOLPomQzZMfpN3ZtD1uyJ2IDonTTlxYiV/pEApiw=
gopkg.in/redis.v5 v5.2.9/go.mod h1:6gtv0/+A4iM08kdRfocWYB3bLX2tebpNtfKlFT6H4mY=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
167 changes: 167 additions & 0 deletions semaphore/constructor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package semaphore

import (
"fmt"
"time"

"github.com/gtforge/redis-semaphore-go/semaphore/semaphore-logger"

"github.com/gtforge/redis-semaphore-go/semaphore/semaphore-redis"

"github.com/pkg/errors"
)

const (
errLvl = semaphorelogger.LogLevelError
infoLvl = semaphorelogger.LogLevelInfo
debugLvl = semaphorelogger.LogLevelDebug
)

type Redis = semaphoreredis.Redis

type Logger = semaphorelogger.Logger

type semaphore struct {
lockByKey string
options Options
redis redisImpl
}

type redisImpl struct {
client Redis
keys []string
}

type Options struct {
TryLockTimeout time.Duration
LockAttempts int64
MaxParallelResources int64
Logger Logger
Expiration time.Duration
}

var defaultOptions = Options{
Expiration: 1 * time.Minute,
TryLockTimeout: 30 * time.Second,
LockAttempts: 1,
MaxParallelResources: 1,
Logger: semaphorelogger.NewEmptyLogger(),
}

func New(lockByKey string, redisClient Redis, options ...Options) (Semaphore, error) {
return create(lockByKey, redisClient, options...)
}

func create(lockByKey string, redisClient Redis, options ...Options) (*semaphore, error) {

s := &semaphore{
lockByKey: lockByKey,
options: setOptions(options...),
redis: redisImpl{client: redisClient},
}

err := s.validate()
if err != nil {
return nil, errors.Wrapf(err, "error in validating new semaphore")
}

s.setRedisKeys()

s.options.Logger.WithFields(map[string]interface{}{
"options": fmt.Sprintf("%+v", s.options),
}).Log(debugLvl, "new semaphore object created successfully")

return s, nil
}

func setOptions(overrides ...Options) Options {
options := defaultOptions

if len(overrides) == 0 {
return options
}

override := overrides[0]

if override.Expiration != 0 {
options.Expiration = override.Expiration
}

if override.TryLockTimeout != 0 {
options.TryLockTimeout = override.TryLockTimeout
}

if override.MaxParallelResources != 0 {
options.MaxParallelResources = override.MaxParallelResources
}

if override.LockAttempts != 0 {
options.LockAttempts = override.LockAttempts
}

if override.Logger != nil {
options.Logger = override.Logger
}

return options
}

func (s *semaphore) validate() error {
if s.lockByKey == "" {
return fmt.Errorf("lock by key field must be non empty")
}

if s.redis.client == nil {
return fmt.Errorf("redis client must be non nil")
}

if s.options.Expiration < time.Second {
return fmt.Errorf("expiration time must be at least 1 second, received %v", s.options.TryLockTimeout)
}

if s.options.TryLockTimeout < time.Second || s.options.TryLockTimeout > s.options.Expiration {
return fmt.Errorf("try lock timeout must be at least 1 second and smaller or equal to semaphore Expiration time, received %v", s.options.TryLockTimeout)
}

if s.options.MaxParallelResources <= 0 {
return fmt.Errorf("max parallel resources setting must be positive number, received %v", s.options.MaxParallelResources)
}

if s.options.LockAttempts <= 0 {
return fmt.Errorf("lock attempts setting must be positive number, received %v", s.options.LockAttempts)
}

return nil
}

func (s *semaphore) setRedisKeys() {
s.redis.keys = append(s.redis.keys, s.name(), s.availableQueueName(), s.lockedResourcesName(), s.version())
}

const (
namePrefix = "semaphore"
availableQueueNamePostfix = "available"
lockedQueueNamePostfix = "locked"
versionPostfix = "version"
releaseExpiredPostfix = "release_expired"
)

func (s *semaphore) name() string {
return fmt.Sprintf("%v:%v", namePrefix, s.lockByKey)
}

func (s *semaphore) availableQueueName() string {
return fmt.Sprintf("%v:%v", s.name(), availableQueueNamePostfix)
}

func (s *semaphore) lockedResourcesName() string {
return fmt.Sprintf("%v:%v", s.name(), lockedQueueNamePostfix)
}

func (s *semaphore) version() string {
return fmt.Sprintf("%v:%v", s.name(), versionPostfix)
}

func (s *semaphore) releaseExpiredLockName() string {
return fmt.Sprintf("%v:%v", s.name(), releaseExpiredPostfix)
}
Loading

0 comments on commit 5782b21

Please sign in to comment.