Skip to content

Commit

Permalink
feat (WaitFor Actions, Nodes and Services) broker, registry and context.
Browse files Browse the repository at this point in the history
+ improved tests
  • Loading branch information
pentateu committed May 29, 2019
1 parent 3fc09bf commit 419bcf3
Show file tree
Hide file tree
Showing 9 changed files with 81 additions and 43 deletions.
45 changes: 40 additions & 5 deletions broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (broker *ServiceBroker) WaitFor(services ...string) error {
return nil
}

// WaitFor : wait for all services to be available
// WaitForNodes : wait for all nodes to be available
func (broker *ServiceBroker) WaitForNodes(nodes ...string) error {
for _, nodeID := range nodes {
if err := broker.waitForNode(nodeID); err != nil {
Expand All @@ -230,6 +230,20 @@ func (broker *ServiceBroker) WaitForNodes(nodes ...string) error {
return nil
}

func (broker *ServiceBroker) KnowAction(action string) bool {
return broker.registry.KnowAction(action)
}

//WaitForActions : wait for all actions to be available
func (broker *ServiceBroker) WaitForActions(actions ...string) error {
for _, action := range actions {
if err := broker.waitAction(action); err != nil {
return err
}
}
return nil
}

//waitForService wait for a service to be available
func (broker *ServiceBroker) waitForService(service string) error {
start := time.Now()
Expand All @@ -247,6 +261,23 @@ func (broker *ServiceBroker) waitForService(service string) error {
return nil
}

//waitAction wait for an action to be available
func (broker *ServiceBroker) waitAction(action string) error {
start := time.Now()
for {
if broker.registry.KnowAction(action) {
break
}
if time.Since(start) > broker.config.WaitForDependenciesTimeout {
err := errors.New("waitAction() - Timeout ! action: " + action)
broker.logger.Error(err)
return err
}
time.Sleep(time.Microsecond)
}
return nil
}

//waitForNode wait for a node to be available
func (broker *ServiceBroker) waitForNode(nodeID string) error {
start := time.Now()
Expand Down Expand Up @@ -490,10 +521,14 @@ func (broker *ServiceBroker) createDelegates() *moleculer.BrokerDelegates {
HandleRemoteEvent: func(context moleculer.BrokerContext) {
broker.registry.HandleRemoteEvent(context)
},
ServiceForAction: func(name string) *moleculer.ServiceSchema {
svc := broker.registry.ServiceForAction(name)
if svc != nil {
return svc.Schema()
ServiceForAction: func(name string) []*moleculer.ServiceSchema {
svcs := broker.registry.ServiceForAction(name)
if svcs != nil {
result := make([]*moleculer.ServiceSchema, len(svcs))
for i, svc := range svcs {
result[i] = svc.Schema()
}
return result
}
return nil
},
Expand Down
4 changes: 2 additions & 2 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ func metricsPayload(brokerContext moleculer.BrokerContext) map[string]interface{
_, isAction := contextMap["action"]
if isAction {
action := contextMap["action"].(string)
svc := rawContext.BrokerDelegates().ServiceForAction(action)
svcs := rawContext.BrokerDelegates().ServiceForAction(action)
contextMap["action"] = map[string]string{"name": action}
contextMap["service"] = map[string]string{"name": svc.Name, "version": svc.Version}
contextMap["service"] = map[string]string{"name": svcs[0].Name, "version": svcs[0].Version}
}
return contextMap
}
Expand Down
12 changes: 6 additions & 6 deletions metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ var _ = Describe("Metrics", func() {
Fail("Invalid event name")
}
}
delegates.ServiceForAction = func(string) *moleculer.ServiceSchema {
return &moleculer.ServiceSchema{
delegates.ServiceForAction = func(string) []*moleculer.ServiceSchema {
return []*moleculer.ServiceSchema{&moleculer.ServiceSchema{
Name: "math",
Version: "2",
}
}}
}
actionContext := context.BrokerContext(delegates).ChildActionContext("math.add", payload.New(nil))
result := payload.New(errors.New("some error"))
Expand Down Expand Up @@ -78,11 +78,11 @@ var _ = Describe("Metrics", func() {
Expect(context.EventName()).Should(Equal("metrics.trace.span.start"))
eventPayload = context.Payload()
}
delegates.ServiceForAction = func(string) *moleculer.ServiceSchema {
return &moleculer.ServiceSchema{
delegates.ServiceForAction = func(string) []*moleculer.ServiceSchema {
return []*moleculer.ServiceSchema{&moleculer.ServiceSchema{
Name: "math",
Version: "2",
}
}}
}
actionContext := context.BrokerContext(delegates).ChildActionContext("math.add", payload.New(nil))
metricStart(actionContext)
Expand Down
2 changes: 1 addition & 1 deletion moleculer.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ type isStartedFunc func() bool
type LocalNodeFunc func() Node
type ActionDelegateFunc func(context BrokerContext, opts ...Options) chan Payload
type EmitEventFunc func(context BrokerContext)
type ServiceForActionFunc func(string) *ServiceSchema
type ServiceForActionFunc func(string) []*ServiceSchema
type MultActionDelegateFunc func(callMaps map[string]map[string]interface{}) chan map[string]Payload
type BrokerContextFunc func() BrokerContext
type MiddlewareHandlerFunc func(name string, params interface{}) interface{}
Expand Down
32 changes: 11 additions & 21 deletions registry/actionCatalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,23 +111,6 @@ func (actionCatalog *ActionCatalog) listByName() map[string][]ActionEntry {
return result
}

func (actionCatalog *ActionCatalog) Find(name string, local bool) *ActionEntry {
list, exists := actionCatalog.actions.Load(name)
if !exists {
return nil
}
actions := list.([]ActionEntry)
if !local && len(actions) > 0 {
return &actions[0]
}
for _, action := range actions {
if action.isLocal {
return &action
}
}
return nil
}

// Add a new action to the catalog.
func (actionCatalog *ActionCatalog) Add(action service.Action, service *service.Service, local bool) {
entry := ActionEntry{service.NodeID(), &action, local, service, actionCatalog.logger}
Expand Down Expand Up @@ -209,12 +192,11 @@ func (actionCatalog *ActionCatalog) printDebugActions() {

// Next find all actions registered in this node and use the strategy to select and return the best one to be called.
func (actionCatalog *ActionCatalog) Next(actionName string, stg strategy.Strategy) *ActionEntry {
list, exists := actionCatalog.actions.Load(actionName)
if !exists {
actionCatalog.logger.Debug("actionCatalog.Next() no entries found for name: ", actionName, " actionCatalog.actions: ", actionCatalog.actions)
actions := actionCatalog.Find(actionName)
if actions == nil {
actionCatalog.logger.Debug("actionCatalog.Next() action not found: ", actionName, " actionCatalog.actions: ", actionCatalog.actions)
return nil
}
actions := list.([]ActionEntry)
nodes := make([]strategy.Selector, len(actions))
for index, action := range actions {
nodes[index] = action
Expand All @@ -229,3 +211,11 @@ func (actionCatalog *ActionCatalog) Next(actionName string, stg strategy.Strateg
actionCatalog.logger.Debug("actionCatalog.Next() no entries selected for name: ", actionName, " actionCatalog.actions: ", actionCatalog.actions)
return nil
}

func (actionCatalog *ActionCatalog) Find(name string) []ActionEntry {
list, exists := actionCatalog.actions.Load(name)
if !exists {
return nil
}
return list.([]ActionEntry)
}
6 changes: 4 additions & 2 deletions registry/nodeService_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package registry_test

import (
"fmt"
"time"

"github.com/moleculer-go/moleculer"
"github.com/moleculer-go/moleculer/test"
Expand Down Expand Up @@ -86,6 +87,7 @@ var _ = Describe("nodeService", func() {
scannerBroker.Start()
scannerBroker.WaitForNodes("node_printerBroker")
scannerBroker.WaitFor("printer")
time.Sleep(time.Millisecond)

result = <-scannerBroker.Call(action, params)
Expect(result.Exists()).Should(BeTrue())
Expand All @@ -95,10 +97,10 @@ var _ = Describe("nodeService", func() {
cpuBroker.Start()
cpuBroker.WaitForNodes("node_printerBroker", "node_scannerBroker")
cpuBroker.WaitFor("printer", "scanner")

time.Sleep(time.Millisecond)
result = <-cpuBroker.Call(action, params)
Expect(result.Exists()).Should(BeTrue())
Expect(snap.SnapshotMulti(fmt.Sprint(label, "cpuBroker"), transformer(result))).Should(Succeed())
Expect(snap.SnapshotMulti(fmt.Sprint(label, "cpuBroker"), transformer(result))).Should(Succeed()) //failed here perdeu o node priunter

close(done)
}
Expand Down
16 changes: 12 additions & 4 deletions registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ func (registry *ServiceRegistry) KnowService(name string) bool {
return registry.services.FindByName(name)
}

func (registry *ServiceRegistry) KnowAction(name string) bool {
return registry.actions.Find(name) != nil
}

func (registry *ServiceRegistry) KnowNode(nodeID string) bool {
_, found := registry.nodes.findNode(nodeID)
return found
Expand Down Expand Up @@ -160,10 +164,14 @@ func (registry *ServiceRegistry) Start() {
}
}

func (registry *ServiceRegistry) ServiceForAction(name string) *service.Service {
action := registry.actions.Find(name, true)
if action != nil {
return action.Service()
func (registry *ServiceRegistry) ServiceForAction(name string) []*service.Service {
actions := registry.actions.Find(name)
if actions != nil {
result := make([]*service.Service, len(actions))
for i, action := range actions {
result[i] = action.Service()
}
return result
}
return nil
}
Expand Down
5 changes: 4 additions & 1 deletion registry/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package registry_test
import (
"os"
"sync"
"time"

"github.com/moleculer-go/cupaloy/v2"
bus "github.com/moleculer-go/goemitter"
Expand Down Expand Up @@ -237,10 +238,12 @@ var _ = Describe("Registry", func() {
}
})
<-step
cpuBroker.WaitForActions("scanner.scan", "printer.print")
time.Sleep(time.Millisecond)

contentToCompute := "Some long long text ..."
computeResult := <-printerBroker.Call("cpu.compute", contentToCompute)
Expect(computeResult.IsError()).ShouldNot(Equal(true))
Expect(computeResult.Error()).Should(Succeed())
Expect(computeResult.Value()).Should(Equal(contentToCompute))

//stopping broker B
Expand Down
2 changes: 1 addition & 1 deletion test/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"time"
)

var CounterCheckTimeout = 5 * time.Second
var CounterCheckTimeout = 10 * time.Second

func Counter() CounterCheck {
return CounterCheck{&sync.Mutex{}, make(map[string]int), make(map[string]int)}
Expand Down

0 comments on commit 419bcf3

Please sign in to comment.