diff --git a/storage.go b/storage.go new file mode 100644 index 0000000..fd438c2 --- /dev/null +++ b/storage.go @@ -0,0 +1,64 @@ +package taskq + +import ( + "context" + "sync" + "time" + + "github.com/hashicorp/golang-lru/simplelru" +) + +type Storage interface { + Exists(ctx context.Context, key string) bool +} + +var _ Storage = (*localStorage)(nil) +var _ Storage = (*redisStorage)(nil) + +// LOCAL + +type localStorage struct { + mu sync.Mutex + cache *simplelru.LRU +} + +func (s localStorage) Exists(_ context.Context, key string) bool { + s.mu.Lock() + defer s.mu.Unlock() + + if s.cache == nil { + var err error + s.cache, err = simplelru.NewLRU(128000, nil) + if err != nil { + panic(err) + } + } + + _, ok := s.cache.Get(key) + if ok { + return true + } + + s.cache.Add(key, nil) + return false +} + +// REDIS + +type redisStorage struct { + redis Redis +} + +func newRedisStorage(redis Redis) redisStorage { + return redisStorage{ + redis: redis, + } +} + +func (s redisStorage) Exists(ctx context.Context, key string) bool { + val, err := s.redis.SetNX(ctx, key, "", 24*time.Hour).Result() + if err != nil { + return true + } + return !val +} diff --git a/taskq.go b/taskq.go index 1f21366..026825e 100644 --- a/taskq.go +++ b/taskq.go @@ -4,11 +4,9 @@ import ( "context" "log" "os" - "sync" "time" "github.com/go-redis/redis/v8" - "github.com/hashicorp/golang-lru/simplelru" "github.com/vmihailenco/taskq/v3/internal" ) @@ -41,59 +39,3 @@ type Redis interface { ScriptExists(ctx context.Context, scripts ...string) *redis.BoolSliceCmd ScriptLoad(ctx context.Context, script string) *redis.StringCmd } - -type Storage interface { - Exists(ctx context.Context, key string) bool -} - -type redisStorage struct { - redis Redis -} - -var _ Storage = (*redisStorage)(nil) - -func newRedisStorage(redis Redis) redisStorage { - return redisStorage{ - redis: redis, - } -} - -func (s redisStorage) Exists(ctx context.Context, key string) bool { - if localCacheExists(key) { - return true - } - - val, err := s.redis.SetNX(ctx, key, "", 24*time.Hour).Result() - if err != nil { - return true - } - return !val -} - -//------------------------------------------------------------------------------ - -var ( - mu sync.Mutex - cache *simplelru.LRU -) - -func localCacheExists(key string) bool { - mu.Lock() - defer mu.Unlock() - - if cache == nil { - var err error - cache, err = simplelru.NewLRU(128000, nil) - if err != nil { - panic(err) - } - } - - _, ok := cache.Get(key) - if ok { - return true - } - - cache.Add(key, nil) - return false -}