Skip to content

Commit

Permalink
fix: DO NOT init variable producer and consumer, only init them in Ne…
Browse files Browse the repository at this point in the history
…wConsumer and NewProducer
  • Loading branch information
Daniel Qin committed Mar 23, 2023
1 parent e17b401 commit 98007ca
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 15 deletions.
7 changes: 4 additions & 3 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@ type Consumer struct {
consumers []*nsq.Consumer
}

var (
consumer = &Consumer{}
)
var ()

func NewConfig(addr, lookupdAddr, topic string, conCurrentCount int) *Config {
config := &Config{
Expand All @@ -46,6 +44,8 @@ func NewConfig(addr, lookupdAddr, topic string, conCurrentCount int) *Config {
}

func NewConsumer(config *Config, nsqConfig *nsq.Config) *Consumer {
consumer := &Consumer{}

handlerConCurrentMap := GetHandlerConCurrent()
// 注册每一个worker
for k, v := range GetHandlers() {
Expand All @@ -70,6 +70,7 @@ func NewConsumer(config *Config, nsqConfig *nsq.Config) *Consumer {

consumer.consumers = append(consumer.consumers, nsqConsumer)
}

return consumer
}

Expand Down
29 changes: 17 additions & 12 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,42 +19,47 @@ import (
"github.com/nsqio/go-nsq"
)

// NsqProducer type
type NsqProducer struct {
// Producer type
type Producer struct {
config *nsq.Config
Producer *nsq.Producer
}

var (
producer = &NsqProducer{}
producer *Producer
)

// NewNsqProducer get producer
func NewNsqProducer(addr string, config *nsq.Config) *NsqProducer {
func NewProducer(addr string, config *nsq.Config) *Producer {
var err error
nsq, err := nsq.NewProducer(addr, config)
if err != nil {
panic(err.Error())
}

producer.Producer = nsq
producer.config = config
return &Producer{
config: config,
Producer: nsq,
}
}

return producer
// NewNsqProducer get producer
// Deprecated: use NewProducer instead
func NewNsqProducer(addr string, config *nsq.Config) *Producer {
return NewProducer(addr, config)
}

// GetProducer 返回producer
func GetProducer() *NsqProducer {
func GetProducer() *Producer {
return producer
}

// Stop producer
func (p *NsqProducer) Stop() {
func (p *Producer) Stop() {
p.Producer.Stop()
}

// Publish producer
func (p *NsqProducer) Publish(topic string, body []byte) error {
func (p *Producer) Publish(topic string, body []byte) error {
err := p.Producer.Publish(topic, body)
if err != nil {
logger.Error("publish message error:[%s],[%s], [%s]", topic, string(body), err.Error())
Expand All @@ -64,7 +69,7 @@ func (p *NsqProducer) Publish(topic string, body []byte) error {
}

// MultiPublish 批量发布
func (p *NsqProducer) MultiPublish(topic string, body [][]byte) error {
func (p *Producer) MultiPublish(topic string, body [][]byte) error {
err := p.Producer.MultiPublish(topic, body)
if err != nil {
logger.Error("MultiPublish message error:[%s], [%s]", topic, err.Error())
Expand Down

0 comments on commit 98007ca

Please sign in to comment.