Skip to content

Commit

Permalink
Issue-97 Fix deadlocks on close connection
Browse files Browse the repository at this point in the history
Issue-97 Format code and fix spell issues, add some error handlers
  • Loading branch information
valinurovam committed Oct 31, 2023
1 parent ce5d373 commit 88a8472
Show file tree
Hide file tree
Showing 29 changed files with 251 additions and 100 deletions.
2 changes: 1 addition & 1 deletion amqp/extended_constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ package amqp

// NoRoute returns when a 'mandatory' message cannot be delivered to any queue.
// @see https://www.rabbitmq.com/amqp-0-9-1-errata.html#section_17
const NoRoute = 312
const NoRoute = 312
4 changes: 2 additions & 2 deletions auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func HashPassword(password string, authType string) (string, error) {
switch authType {
case authMD5:
h := md5.New()
// digest.Write never return any error, so skip error ckeck
// digest.Write never return any error, so skip error check
h.Write([]byte(password))
return hex.EncodeToString(h.Sum(nil)), nil
case authBcrypt:
Expand All @@ -63,7 +63,7 @@ func CheckPasswordHash(password, hash string, authType string) bool {
switch authType {
case authMD5:
h := md5.New()
// digest.Write never return any error, so skip error ckeck
// digest.Write never return any error, so skip error check
h.Write([]byte(password))
return hash == hex.EncodeToString(h.Sum(nil))
case authBcrypt:
Expand Down
4 changes: 1 addition & 3 deletions auth/auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ func TestParsePlain_Failed_WrongFormat(t *testing.T) {
}
}


func TestHashPassword_Failed(t *testing.T) {
password := t.Name()
_, err := HashPassword(password, t.Name())
Expand All @@ -39,7 +38,6 @@ func TestHashPassword_Failed(t *testing.T) {
}
}


func TestCheckPasswordHash_Bcrypt(t *testing.T) {
password := t.Name()
hash, err := HashPassword(password, authBcrypt)
Expand Down Expand Up @@ -99,4 +97,4 @@ func TestCheckFailed(t *testing.T) {
t.Fatal("Expected false on check password with wrong type")
}

}
}
2 changes: 1 addition & 1 deletion config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func defaultConfig() *Config {
Port: "15672",
},
Queue: Queue{
ShardSize: 8 << 10, // 8k
ShardSize: 8 << 10, // 8k
MaxMessagesInRAM: 10 * 8 << 10, // 10 buckets
},
Db: Db{
Expand Down
5 changes: 3 additions & 2 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package consumer

import (
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/sasha-s/go-deadlock"

"github.com/valinurovam/garagemq/amqp"
"github.com/valinurovam/garagemq/interfaces"
"github.com/valinurovam/garagemq/qos"
Expand All @@ -28,7 +29,7 @@ type Consumer struct {
noAck bool
channel interfaces.Channel
queue *queue.Queue
statusLock sync.RWMutex
statusLock deadlock.RWMutex
status int
qos []*qos.AmqpQos
consume chan struct{}
Expand Down
5 changes: 3 additions & 2 deletions exchange/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ package exchange
import (
"bytes"
"fmt"
"sync"

"github.com/sasha-s/go-deadlock"

"github.com/valinurovam/garagemq/amqp"
"github.com/valinurovam/garagemq/binding"
Expand Down Expand Up @@ -46,7 +47,7 @@ type Exchange struct {
autoDelete bool
internal bool
system bool
bindLock sync.Mutex
bindLock deadlock.Mutex
bindings []*binding.Binding
metrics *MetricsState
}
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf
github.com/dgraph-io/badger v1.6.2
github.com/rabbitmq/amqp091-go v1.7.0
github.com/sasha-s/go-deadlock v0.3.1
github.com/sirupsen/logrus v1.9.0
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.14.0
Expand All @@ -28,6 +29,7 @@ require (
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/pelletier/go-toml v1.9.5 // indirect
github.com/pelletier/go-toml/v2 v2.0.6 // indirect
github.com/petermattis/goid v0.0.0-20230904192822-1876fd5063bc // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/spf13/afero v1.9.3 // indirect
github.com/spf13/cast v1.5.0 // indirect
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,9 @@ github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3v
github.com/pelletier/go-toml v1.9.5/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c=
github.com/pelletier/go-toml/v2 v2.0.6 h1:nrzqCb7j9cDFj2coyLNLaZuJTLjWjlaz6nvTvIwycIU=
github.com/pelletier/go-toml/v2 v2.0.6/go.mod h1:eumQOmlWiOPt5WriQQqoM5y18pDHwha2N+QD+EUNTek=
github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5/go.mod h1:jvVRKCrJTQWu0XVbaOlby/2lO20uSCHEMzzplHXte1o=
github.com/petermattis/goid v0.0.0-20230904192822-1876fd5063bc h1:8bQZVK1X6BJR/6nYUPxQEP+ReTsceJTKizeuwjWOPUA=
github.com/petermattis/goid v0.0.0-20230904192822-1876fd5063bc/go.mod h1:pxMtw7cyUw6B2bRH0ZBANSPg+AoSud1I1iyJHI69jH4=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand All @@ -188,6 +191,8 @@ github.com/rabbitmq/amqp091-go v1.7.0/go.mod h1:wfClAtY0C7bOHxd3GjmF26jEHn+rR/0B
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k=
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
github.com/sasha-s/go-deadlock v0.3.1 h1:sqv7fDNShgjcaxkO0JNcOAlr8B9+cV5Ey/OB71efZx0=
github.com/sasha-s/go-deadlock v0.3.1/go.mod h1:F73l+cr82YSh10GxyRI6qZiCgK64VaZjwesgfQ1/iLM=
github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0=
github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
Expand Down
2 changes: 1 addition & 1 deletion interfaces/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

// Channel represents base channel public interface
type Channel interface {
SendContent(method amqp.Method, message *amqp.Message)
SendContent(method amqp.Method, message *amqp.Message) *amqp.Error
SendMethod(method amqp.Method)
NextDeliveryTag() uint64
AddUnackedMessage(dTag uint64, cTag string, queue string, message *amqp.Message)
Expand Down
8 changes: 7 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ import (
"strings"
"time"

"github.com/sasha-s/go-deadlock"
"github.com/sirupsen/logrus"
"github.com/spf13/pflag"
"github.com/spf13/viper"

"github.com/valinurovam/garagemq/admin"
"github.com/valinurovam/garagemq/config"
"github.com/valinurovam/garagemq/metrics"
Expand All @@ -34,7 +36,7 @@ func init() {
levels = append(levels, l.String())
}
flag.String("log-file", "stdout", "Log file")
flag.String("log-level", "info", fmt.Sprintf("Log level (%s)", strings.Join(levels, ", ")))
flag.String("log-level", "debug", fmt.Sprintf("Log level (%s)", strings.Join(levels, ", ")))
flag.Bool("hprof", false, "Starts server with hprof profiler.")
flag.String("hprof-host", "0.0.0.0", "hprof profiler host.")
flag.String("hprof-port", "8080", "hprof profiler port.")
Expand All @@ -45,6 +47,10 @@ func init() {
}

func main() {
// server has an issue with locks deadlock
// solving is in progress
deadlock.Opts.Disable = true

if viper.GetBool("help") {
flag.Usage()
os.Exit(0)
Expand Down
5 changes: 3 additions & 2 deletions metrics/registry.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package metrics

import (
"sync"
"time"

"github.com/sasha-s/go-deadlock"
)

var r *TrackRegistry

// TrackRegistry is a registry of track counters or other track metrics
type TrackRegistry struct {
cntLock sync.Mutex
cntLock deadlock.Mutex
Counters map[string]*TrackCounter
trackLength int
trackTick *time.Ticker
Expand Down
5 changes: 3 additions & 2 deletions metrics/trackBuffer.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package metrics

import (
"sync"
"time"

"github.com/sasha-s/go-deadlock"
)

// TrackItem implements tracked item with value and timestamp
Expand All @@ -15,7 +16,7 @@ type TrackItem struct {
type TrackBuffer struct {
track []*TrackItem
trackDiff []*TrackItem
lock sync.RWMutex
lock deadlock.RWMutex
pos int
length int

Expand Down
5 changes: 3 additions & 2 deletions msgstorage/msgstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package msgstorage
import (
"strconv"
"strings"
"sync"
"time"

"github.com/sasha-s/go-deadlock"

"github.com/valinurovam/garagemq/amqp"
"github.com/valinurovam/garagemq/interfaces"
)
Expand All @@ -16,7 +17,7 @@ import (
// If storage in confirm-mode - in every persisted message storage send confirm to vhost
type MsgStorage struct {
db interfaces.DbStorage
persistLock sync.Mutex
persistLock deadlock.Mutex
add map[string]*amqp.Message
update map[string]*amqp.Message
del map[string]*amqp.Message
Expand Down
4 changes: 2 additions & 2 deletions qos/qos.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package qos

import "sync"
import "github.com/sasha-s/go-deadlock"

// AmqpQos represents qos system
type AmqpQos struct {
sync.Mutex
deadlock.Mutex
prefetchCount uint16
currentCount uint16
prefetchSize uint32
Expand Down
8 changes: 5 additions & 3 deletions queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"sync"
"sync/atomic"

"github.com/sasha-s/go-deadlock"

"github.com/valinurovam/garagemq/amqp"
"github.com/valinurovam/garagemq/config"
"github.com/valinurovam/garagemq/interfaces"
Expand Down Expand Up @@ -40,13 +42,13 @@ type Queue struct {
exclusive bool
autoDelete bool
durable bool
cmrLock sync.RWMutex
cmrLock deadlock.RWMutex
consumers []interfaces.Consumer
consumeExcl bool
call chan struct{}
wasConsumed bool
shardSize int
actLock sync.RWMutex
actLock deadlock.RWMutex
active bool
// persistent storage
msgPStorage interfaces.MsgStorage
Expand All @@ -58,7 +60,7 @@ type Queue struct {
queueLength int64

// lock for sync load swapped-messages from disk
loadSwapLock sync.Mutex
loadSwapLock deadlock.Mutex
maxMessagesInRAM uint64
lastStoredMsgID uint64
lastMemMsgID uint64
Expand Down
4 changes: 2 additions & 2 deletions safequeue/safequeue.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package safequeue

import (
"sync"
"github.com/sasha-s/go-deadlock"

"github.com/valinurovam/garagemq/amqp"
)
Expand All @@ -12,7 +12,7 @@ import (
// SafeQueue represents simple FIFO queue
// TODO Is that implementation faster? test simple slice queue
type SafeQueue struct {
sync.RWMutex
deadlock.RWMutex
shards [][]*amqp.Message
shardSize int
tailIdx int
Expand Down
Loading

0 comments on commit 88a8472

Please sign in to comment.