Skip to content

Commit

Permalink
Merge pull request #59 from moleculer-go/feat/moleculerjs
Browse files Browse the repository at this point in the history
Feat/moleculerjs
  • Loading branch information
pentateu authored May 7, 2019
2 parents 819c642 + e307e42 commit 9ff7092
Show file tree
Hide file tree
Showing 62 changed files with 1,214 additions and 692 deletions.
6 changes: 0 additions & 6 deletions broker/.snapshots/broker-glob--func1-2-1-bkr1-results

This file was deleted.

6 changes: 0 additions & 6 deletions broker/.snapshots/broker-glob--func1-2-1-bkr2-results

This file was deleted.

27 changes: 14 additions & 13 deletions broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/moleculer-go/moleculer/context"
"github.com/moleculer-go/moleculer/metrics"
"github.com/moleculer-go/moleculer/middleware"
"github.com/moleculer-go/moleculer/options"
"github.com/moleculer-go/moleculer/payload"
"github.com/moleculer-go/moleculer/registry"
"github.com/moleculer-go/moleculer/serializer"
Expand Down Expand Up @@ -92,6 +91,8 @@ type ServiceBroker struct {

delegates *moleculer.BrokerDelegates

id string

localNode moleculer.Node
}

Expand Down Expand Up @@ -179,9 +180,8 @@ func (broker *ServiceBroker) createBrokerLogger() *log.Entry {
log.SetLevel(log.InfoLevel)
}

nodeID := broker.config.DiscoverNodeID()
brokerLogger := log.WithFields(log.Fields{
"broker": nodeID,
"broker": broker.id,
})
//broker.logger.Debug("Broker Log Setup -> Level", log.GetLevel(), " nodeID: ", nodeID)
return brokerLogger
Expand Down Expand Up @@ -228,7 +228,8 @@ func (broker *ServiceBroker) Start() {
broker.logger.Warn("broker.Start() called on a broker that already started!")
return
}
broker.logger.Info("Broker -> Starting...")
broker.logger.Info("Moleculer is starting...")
broker.logger.Info("Node ID: ", broker.localNode.GetID())

broker.middlewares.CallHandlers("brokerStarting", broker.delegates)

Expand All @@ -254,15 +255,15 @@ func (broker *ServiceBroker) Start() {
defer broker.middlewares.CallHandlers("brokerStarted", broker.delegates)

broker.started = true
broker.logger.Info("Broker -> Started !!!")
broker.logger.Info("Service Broker with ", len(broker.services), " service(s) started successfully.")
}

func (broker *ServiceBroker) Stop() {
if !broker.started {
broker.logger.Info("Broker is not started!")
return
}
broker.logger.Info("Broker -> Stopping...")
broker.logger.Info("Service Broker is stopping...")

broker.middlewares.CallHandlers("brokerStopping", broker.delegates)

Expand Down Expand Up @@ -334,13 +335,13 @@ func (broker *ServiceBroker) MCall(callMaps map[string]map[string]interface{}) c
}

// Call : invoke a service action and return a channel which will eventualy deliver the results ;)
func (broker *ServiceBroker) Call(actionName string, params interface{}, opts ...moleculer.OptionsFunc) chan moleculer.Payload {
func (broker *ServiceBroker) Call(actionName string, params interface{}, opts ...moleculer.Options) chan moleculer.Payload {
broker.logger.Trace("Broker - Call() actionName: ", actionName, " params: ", params, " opts: ", opts)
if !broker.IsStarted() {
panic(errors.New("Broker must be started before making calls :("))
}
actionContext := broker.rootContext.ChildActionContext(actionName, payload.New(params), options.Wrap(opts))
return broker.registry.LoadBalanceCall(actionContext, options.Wrap(opts))
actionContext := broker.rootContext.ChildActionContext(actionName, payload.New(params), opts...)
return broker.registry.LoadBalanceCall(actionContext, opts...)
}

func (broker *ServiceBroker) Emit(event string, params interface{}, groups ...string) {
Expand Down Expand Up @@ -400,6 +401,7 @@ func (broker *ServiceBroker) registerInternalMiddlewares() {
}

func (broker *ServiceBroker) init() {
broker.id = broker.config.DiscoverNodeID()
broker.logger = broker.createBrokerLogger()
broker.setupLocalBus()

Expand All @@ -410,7 +412,7 @@ func (broker *ServiceBroker) init() {
broker.logger.Debug("Config middleware after: \n", broker.config)

broker.delegates = broker.createDelegates()
broker.registry = registry.CreateRegistry(broker.delegates)
broker.registry = registry.CreateRegistry(broker.id, broker.delegates)
broker.localNode = broker.registry.LocalNode()
broker.rootContext = context.BrokerContext(broker.delegates)

Expand All @@ -423,8 +425,8 @@ func (broker *ServiceBroker) createDelegates() *moleculer.BrokerDelegates {
Bus: broker.LocalBus,
IsStarted: broker.IsStarted,
Config: broker.config,
ActionDelegate: func(context moleculer.BrokerContext, opts ...moleculer.OptionsFunc) chan moleculer.Payload {
return broker.registry.LoadBalanceCall(context, options.Wrap(opts))
ActionDelegate: func(context moleculer.BrokerContext, opts ...moleculer.Options) chan moleculer.Payload {
return broker.registry.LoadBalanceCall(context, opts...)
},
EmitEvent: func(context moleculer.BrokerContext) {
broker.registry.LoadBalanceEvent(context)
Expand Down Expand Up @@ -459,6 +461,5 @@ func New(userConfig ...*moleculer.Config) *ServiceBroker {
config := mergeConfigs(moleculer.DefaultConfig, userConfig)
broker := ServiceBroker{config: config}
broker.init()
broker.logger.Info("Broker - New() ")
return &broker
}
87 changes: 47 additions & 40 deletions context/contextFactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"

"github.com/moleculer-go/moleculer"
"github.com/moleculer-go/moleculer/options"
"github.com/moleculer-go/moleculer/payload"
"github.com/moleculer-go/moleculer/util"

Expand All @@ -24,7 +23,7 @@ type Context struct {
groups []string
broadcast bool
params moleculer.Payload
meta *map[string]interface{}
meta moleculer.Payload
timeout int
level int
}
Expand All @@ -37,6 +36,7 @@ func BrokerContext(broker *moleculer.BrokerDelegates) moleculer.BrokerContext {
broker: broker,
level: 1,
parentID: "ImGroot;)",
meta: payload.Empty(),
}
return &context
}
Expand All @@ -45,12 +45,8 @@ func BrokerContext(broker *moleculer.BrokerDelegates) moleculer.BrokerContext {
func (context *Context) ChildEventContext(eventName string, params moleculer.Payload, groups []string, broadcast bool) moleculer.BrokerContext {
parentContext := context
meta := parentContext.meta
if meta == nil {
metaMap := make(map[string]interface{})
meta = &metaMap
}
if context.broker.Config.Metrics {
(*meta)["metrics"] = true
meta = meta.Add("metrics", true)
}
id := util.RandomString(12)
var requestID string
Expand Down Expand Up @@ -80,15 +76,14 @@ func (context *Context) BrokerDelegates() *moleculer.BrokerDelegates {
}

// ChildActionContext : create a chiold context for a specific action call.
func (context *Context) ChildActionContext(actionName string, params moleculer.Payload, opts ...moleculer.OptionsFunc) moleculer.BrokerContext {
func (context *Context) ChildActionContext(actionName string, params moleculer.Payload, opts ...moleculer.Options) moleculer.BrokerContext {
parentContext := context
meta := parentContext.meta
if meta == nil {
metaMap := make(map[string]interface{})
meta = &metaMap
}
if context.broker.Config.Metrics {
(*meta)["metrics"] = true
meta = meta.Add("metrics", true)
}
if len(opts) > 0 && opts[0].Meta != nil && opts[0].Meta.Len() > 0 {
meta = meta.AddMany(opts[0].Meta.RawMap())
}
id := util.RandomString(12)
var requestID string
Expand Down Expand Up @@ -118,9 +113,8 @@ func checkMaxCalls(context *Context) {
// ActionContext create an action context for remote call.
func ActionContext(broker *moleculer.BrokerDelegates, values map[string]interface{}) moleculer.BrokerContext {
var level int
var parentID string
var timeout int
var meta map[string]interface{}
var meta moleculer.Payload

sourceNodeID := values["sender"].(string)
id := values["id"].(string)
Expand All @@ -129,14 +123,22 @@ func ActionContext(broker *moleculer.BrokerDelegates, values map[string]interfac
panic(errors.New("Can't create an action context, you need a action field!"))
}
level = values["level"].(int)
parentID = values["parentID"].(string)

parentID := ""
if p, ok := values["parentID"]; ok {
if s, ok := p.(string); ok {
parentID = s
}
}
params := payload.New(values["params"])

if values["timeout"] != nil {
timeout = values["timeout"].(int)
}
if values["meta"] != nil {
meta = values["meta"].(map[string]interface{})
meta = payload.New(values["meta"])
} else {
meta = payload.Empty()
}

newContext := Context{
Expand All @@ -147,7 +149,7 @@ func ActionContext(broker *moleculer.BrokerDelegates, values map[string]interfac
actionName: actionName.(string),
parentID: parentID,
params: params,
meta: &meta,
meta: meta,
timeout: timeout,
level: level,
}
Expand All @@ -157,30 +159,29 @@ func ActionContext(broker *moleculer.BrokerDelegates, values map[string]interfac

// EventContext create an event context for a remote call.
func EventContext(broker *moleculer.BrokerDelegates, values map[string]interface{}) moleculer.BrokerContext {
var level int
var parentID string
var timeout int
var meta map[string]interface{}

var meta moleculer.Payload
sourceNodeID := values["sender"].(string)
id := values["id"].(string)
id := ""
if t, ok := values["id"]; ok {
id = t.(string)
}
eventName, isEvent := values["event"]
if !isEvent {
panic(errors.New("Can't create an event context, you need an event field!"))
}
params := payload.New(values["params"])

if values["meta"] != nil {
meta = payload.New(values["meta"])
} else {
meta = payload.Empty()
}
newContext := Context{
broker: broker,
sourceNodeID: sourceNodeID,
id: id,
eventName: eventName.(string),
broadcast: values["broadcast"].(bool),
parentID: parentID,
params: params,
meta: &meta,
timeout: timeout,
level: level,
params: payload.New(values["data"]),
meta: meta,
}
if values["groups"] != nil {
temp := values["groups"]
Expand Down Expand Up @@ -209,27 +210,29 @@ func (context *Context) RequestID() string {
func (context *Context) AsMap() map[string]interface{} {
mapResult := make(map[string]interface{})

var metrics interface{}
if context.meta != nil {
metrics = (*context.meta)["metrics"]
var metrics bool
if context.meta.Get("metrics").Exists() {
metrics = context.meta.Get("metrics").Bool()
}

mapResult["id"] = context.id
mapResult["requestID"] = context.requestID

mapResult["params"] = context.params.Value()
mapResult["level"] = context.level
if context.actionName != "" {
mapResult["action"] = context.actionName
mapResult["metrics"] = metrics
mapResult["parentID"] = context.parentID
mapResult["meta"] = (*context.meta)
mapResult["meta"] = context.meta.RawMap()
mapResult["timeout"] = context.timeout
mapResult["params"] = context.params.Value()
}
if context.eventName != "" {
mapResult["event"] = context.eventName
mapResult["groups"] = context.groups
mapResult["broadcast"] = context.broadcast
mapResult["data"] = context.params.Value()
mapResult["meta"] = context.meta.RawMap()
}

//TODO : check how to support streaming params in go
Expand All @@ -244,9 +247,9 @@ func (context *Context) MCall(callMaps map[string]map[string]interface{}) chan m

// Call : main entry point to call actions.
// chained action invocation
func (context *Context) Call(actionName string, params interface{}, opts ...moleculer.OptionsFunc) chan moleculer.Payload {
actionContext := context.ChildActionContext(actionName, payload.New(params), options.Wrap(opts))
return context.broker.ActionDelegate(actionContext, options.Wrap(opts))
func (context *Context) Call(actionName string, params interface{}, opts ...moleculer.Options) chan moleculer.Payload {
actionContext := context.ChildActionContext(actionName, payload.New(params), opts...)
return context.broker.ActionDelegate(actionContext, opts...)
}

// Emit : Emit an event (grouped & balanced global event)
Expand Down Expand Up @@ -299,10 +302,14 @@ func (context *Context) ID() string {
return context.id
}

func (context *Context) Meta() *map[string]interface{} {
func (context *Context) Meta() moleculer.Payload {
return context.meta
}

func (context *Context) UpdateMeta(meta moleculer.Payload) {
context.meta = meta
}

func (context *Context) Logger() *log.Entry {
if context.actionName != "" {
return context.broker.Logger("action", context.actionName)
Expand Down
14 changes: 7 additions & 7 deletions context/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,13 @@ var _ = g.Describe("Context", func() {
"sender": "test",
"id": "id",
"event": "event",
"params": map[string]interface{}{},
"data": map[string]interface{}{},
"groups": []string{"a", "b"},
"broadcast": true,
})
Expect(eventContext).ShouldNot(BeNil())
Expect(eventContext.IsBroadcast()).Should(BeTrue())
Expect(len(eventContext.AsMap())).Should(Equal(8))
Expect(len(eventContext.AsMap())).Should(Equal(9))
Expect(eventContext.EventName()).Should(Equal("event"))
Expect(eventContext.Groups()).Should(Equal([]string{"a", "b"}))
Expect(eventContext.Payload()).Should(Equal(payload.Empty()))
Expand All @@ -104,24 +104,24 @@ var _ = g.Describe("Context", func() {
brokerContext := BrokerContext(test.DelegatesWithIdAndConfig("nodex", config))
actionContext := brokerContext.ChildActionContext("actionx", nil)
Expect(actionContext.Meta()).ShouldNot(BeNil())
Expect((*actionContext.Meta())["metrics"]).Should(BeTrue())
Expect(actionContext.Meta().Get("metrics").Bool()).Should(BeTrue())

eventContext := brokerContext.ChildEventContext("eventx", nil, nil, false)
Expect(eventContext.Meta()).ShouldNot(BeNil())
Expect(eventContext.RequestID()).ShouldNot(Equal(""))
Expect((*eventContext.Meta())["metrics"]).Should(BeTrue())
Expect(actionContext.Meta().Get("metrics").Bool()).Should(BeTrue())

config = moleculer.Config{
Metrics: false,
}
brokerContext = BrokerContext(test.DelegatesWithIdAndConfig("nodex", config))
actionContext = brokerContext.ChildActionContext("actionx", nil)
Expect(actionContext.Meta()).ShouldNot(BeNil())
Expect((*actionContext.Meta())["metrics"]).Should(BeNil())
Expect(actionContext.Meta().Get("metrics").Exists()).Should(BeFalse())

eventContext = brokerContext.ChildEventContext("eventx", nil, nil, false)
Expect(eventContext.Meta()).ShouldNot(BeNil())
Expect((*eventContext.Meta())["metrics"]).Should(BeNil())
Expect(actionContext.Meta().Get("metrics").Exists()).Should(BeFalse())

})

Expand All @@ -143,7 +143,7 @@ var _ = g.Describe("Context", func() {
g.It("Should call Call and delegate it to broker", func() {
delegates := test.DelegatesWithIdAndConfig("x", moleculer.Config{})
called := false
delegates.ActionDelegate = func(context moleculer.BrokerContext, opts ...moleculer.OptionsFunc) chan moleculer.Payload {
delegates.ActionDelegate = func(context moleculer.BrokerContext, opts ...moleculer.Options) chan moleculer.Payload {
called = true
result := make(chan moleculer.Payload, 1)
result <- payload.New("value")
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/lib/pq v1.1.1 // indirect
github.com/moleculer-go/cupaloy/v2 v2.5.0
github.com/moleculer-go/goemitter v1.0.0
github.com/moleculer-go/goemitter v1.0.1
github.com/nats-io/gnatsd v1.4.1 // indirect
github.com/nats-io/go-nats v1.7.2 // indirect
github.com/nats-io/go-nats-streaming v0.4.2
Expand Down
Loading

0 comments on commit 9ff7092

Please sign in to comment.