Skip to content

Commit

Permalink
Merge pull request #17 from acjzz/revert-16-feature/handler_return
Browse files Browse the repository at this point in the history
Revert "Add return on handlers"
  • Loading branch information
acjzz authored Feb 15, 2020
2 parents 4e02788 + 2d7c03e commit f9b1d61
Show file tree
Hide file tree
Showing 8 changed files with 12 additions and 17 deletions.
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@ func main(){

for _, topicName := range topics {
// Register different Handler per each Topic as well as the Topics themselves
ge.AddTopic(topicName, func(topic string, obj interface{}) interface {} {
ge.AddTopic(topicName, func(topic string, obj interface{}) {
// Printf usage on the handlers is not recommended at all
// if you pretend to achieve near realtime streams
// In this example is for demonstration purposes only
fmt.Printf("Consumed '%v' from topic '%s'\n", obj, topic)
return nil
})
}

Expand Down
4 changes: 2 additions & 2 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ type consumer struct {
ctx context.Context
channel *chan internalMessage
logger *logrus.Entry
handler func(string, interface{}) interface{}
handler func(string, interface{})
}

func newConsumer(ctx context.Context, ch *chan internalMessage, handler func(string, interface{}) interface{}) *consumer {
func newConsumer(ctx context.Context, ch *chan internalMessage, handler func(string, interface{})) *consumer {
return &consumer{ctx, ch, NewLogger(ctx), handler, }
}

Expand Down
2 changes: 1 addition & 1 deletion consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ func Test_consumer(t *testing.T) {
ctx = setLogLevelKey(ctx, logrus.InfoLevel)

var channel chan internalMessage
c := newConsumer(ctx, &channel, func(s string, i interface{}) interface{}{ return nil })
c := newConsumer(ctx, &channel, func(s string, i interface{}) {})
c.run()
cancel()
})
Expand Down
2 changes: 1 addition & 1 deletion engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (ge *Engine) Stop() {
ge.ctxCancel()
}

func (ge *Engine) AddTopic(name string, handler func(string, interface{}) interface{}, numConsumers ...int) {
func (ge *Engine) AddTopic(name string, handler func(string, interface{}), numConsumers ...int) {
name = strings.ToLower(name)
if _, ok := ge.topics[name]; !ok {
if len(numConsumers) > 0 {
Expand Down
8 changes: 3 additions & 5 deletions engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestEngine_Publish_Error(t *testing.T) {
tt.fields.logLevel,
)
if tt.startTopic {
ge.AddTopic(topicName, func(topic string, obj interface{}) interface{}{ return nil })
ge.AddTopic(topicName, func(topic string, obj interface{}){})
ge.Stop()
}
err := ge.Publish(tt.args.name, tt.args.obj)
Expand All @@ -75,13 +75,12 @@ func TestEngine_Publish(t *testing.T) {
topicName := "topic"
msg := "Test Message"

ge.AddTopic(topicName, func(topic string, obj interface{}) interface {}{
ge.AddTopic(topicName, func(topic string, obj interface{}){
if strings.Compare(fmt.Sprintf("%v", obj), msg) != 0 {
t.Errorf("publish() received = '%v', expected '%v'", obj, msg)
} else if strings.Compare(topicName, topic) != 0 {
t.Errorf("publish() received from topic '%s', expected '%s'", topic, topicName)
}
return nil
})

err := ge.Publish(topicName, msg)
Expand Down Expand Up @@ -118,12 +117,11 @@ func TestEngine_Publish_Multiple_Topics(t *testing.T) {
ge := NewEngine(tt.name, logrus.ErrorLevel, )
for i := 0; i < tt.numTopics; i++ {
topicName := fmt.Sprintf("%s-%d",baseTopicName,i)
ge.AddTopic(topicName, func(topic string, obj interface{}) interface {} {
ge.AddTopic(topicName, func(topic string, obj interface{}) {
received := obj.(Message)
if strings.Compare(received.topic, topic) != 0 {
t.Errorf("publish() received from topic '%s', expected '%s'", received.topic, topicName)
}
return nil
}, tt.numConsumers)
}

Expand Down
3 changes: 1 addition & 2 deletions examples/example.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,11 @@ func main(){

for _, topicName := range topics {
// Register different Handler per each Topic as well as the Topics themselves
ge.AddTopic(topicName, func(topic string, obj interface{}) interface {} {
ge.AddTopic(topicName, func(topic string, obj interface{}) {
// Printf usage on the handlers is not recommended at all
// if you pretend to achieve near realtime streams
// In this example is for demonstration purposes only
fmt.Printf("Consumed '%v' from topic '%s'\n", obj, topic)
return nil
})
}

Expand Down
4 changes: 2 additions & 2 deletions topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ type Topic struct {
channel chan internalMessage
consumers []*consumer
producer *producer
handler func(string, interface{}) interface{}
handler func(string, interface{})
}

func NewTopic(ctx context.Context, name string, handler func(string, interface{}) interface{}, numConsumers ...int) *Topic {
func NewTopic(ctx context.Context, name string, handler func(string, interface{}), numConsumers ...int) *Topic {
var channelTopic chan internalMessage
if len(numConsumers) > 0 {
channelTopic = make(chan internalMessage, numConsumers[0])
Expand Down
3 changes: 1 addition & 2 deletions topic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@ func TestTopic_Publish(t *testing.T) {
topic := NewTopic(
ctx,
topicName,
func(topic string, obj interface{}) interface{} {
func(topic string, obj interface{}){
if strings.Compare(fmt.Sprintf("%v", obj), msg.value.(string)) != 0 {
t.Errorf("publish() received = '%v', expected '%v'", obj, msg)
} else if strings.Compare(topicName, topic) != 0 {
t.Errorf("publish() received from topic '%s', expected '%s'", topic, topicName)
}
return nil
},
)

Expand Down

0 comments on commit f9b1d61

Please sign in to comment.