Skip to content

Commit

Permalink
Merge pull request #87 from moleculer-go/feat/namespace
Browse files Browse the repository at this point in the history
Feat/namespace
  • Loading branch information
pentateu authored Apr 14, 2020
2 parents fa770e6 + c39ffcb commit ec63868
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 8 deletions.
4 changes: 4 additions & 0 deletions broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ func mergeConfigs(baseConfig moleculer.Config, userConfig []*moleculer.Config) m
if config.RequestTimeout != 0 {
baseConfig.RequestTimeout = config.RequestTimeout
}

if config.Namespace != "" {
baseConfig.Namespace = config.Namespace
}
}
}
return baseConfig
Expand Down
10 changes: 8 additions & 2 deletions registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type ServiceRegistry struct {
offlineCheckFrequency time.Duration
offlineTimeout time.Duration
nodeReceivedMutex *sync.Mutex
namespace string
}

// createTransit create a transit instance based on the config.
Expand Down Expand Up @@ -80,6 +81,7 @@ func CreateRegistry(nodeID string, broker *moleculer.BrokerDelegates) *ServiceRe
offlineTimeout: config.OfflineTimeout,
stopping: false,
nodeReceivedMutex: &sync.Mutex{},
namespace: config.Namespace,
}

registry.logger.Debug("Service Registry created for broker: ", nodeID)
Expand Down Expand Up @@ -256,11 +258,15 @@ func (registry *ServiceRegistry) BroadcastEvent(context moleculer.BrokerContext)
func (registry *ServiceRegistry) LoadBalanceCall(context moleculer.BrokerContext, opts ...moleculer.Options) chan moleculer.Payload {
actionName := context.ActionName()
params := context.Payload()
registry.logger.Trace("LoadBalanceCall() - actionName: ", actionName, " params: ", params, " opts: ", opts)

registry.logger.Trace("LoadBalanceCall() - actionName: ", actionName, " params: ", params, " namespace: ", registry.namespace, " opts: ", opts)

actionEntry := registry.nextAction(actionName, registry.strategy, opts...)
if actionEntry == nil {
msg := fmt.Sprint("Registry - endpoint not found for actionName: ", actionName)
msg := "Registry - endpoint not found for actionName: " + actionName
if registry.namespace != "" {
msg = msg + " namespace: " + registry.namespace
}
registry.logger.Error(msg)
resultChan := make(chan moleculer.Payload, 1)
resultChan <- payload.Error(msg)
Expand Down
92 changes: 92 additions & 0 deletions registry/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,4 +265,96 @@ var _ = Describe("Registry", func() {
close(done)
}, 3)
})

Describe("Namespace", func() {

It("Services across namespaces cannos see each other", func(done Done) {

mem := &memory.SharedMemory{}

devBroker := broker.New(&moleculer.Config{
DiscoverNodeID: func() string { return "node1_devBroker" },
LogLevel: logLevel,
Namespace: "dev",
TransporterFactory: func() interface{} {
transport := memory.Create(log.WithField("transport", "memory"), mem)
return &transport
},
})

stageBroker := broker.New(&moleculer.Config{
DiscoverNodeID: func() string { return "node1_stageBroker" },
LogLevel: logLevel,
Namespace: "stage",
TransporterFactory: func() interface{} {
transport := memory.Create(log.WithField("transport", "memory"), mem)
return &transport
},
})

//alarm service - prints the alarm and return the namespace :)
alarmService := func(namemspace string) moleculer.ServiceSchema {
return moleculer.ServiceSchema{
Name: "alarm",
Actions: []moleculer.Action{
{
Name: "bell",
Handler: func(context moleculer.Context, params moleculer.Payload) interface{} {
context.Logger().Info("alarm.bell ringing !!! namemspace: ", namemspace)
return namemspace
},
},
},
}
}

devOnlyService := moleculer.ServiceSchema{
Name: "good",
Actions: []moleculer.Action{
{
Name: "code",
Handler: func(context moleculer.Context, params moleculer.Payload) interface{} {
context.Logger().Info("nice code :)")
return "🧠"
},
},
},
}

devBroker.Publish(alarmService("dev"))
devBroker.Publish(devOnlyService)
devBroker.Start()

stageBroker.Start()

devAlarm := <-devBroker.Call("alarm.bell", nil)
Expect(devAlarm.IsError()).Should(BeFalse())
Expect(devAlarm.String()).Should(Equal("dev"))

code := <-devBroker.Call("good.code", nil)
Expect(code.IsError()).Should(BeFalse())
Expect(code.String()).Should(Equal("🧠"))

time.Sleep(time.Millisecond)

//alarm.bell should not be accessible to the stage broker
stageAlarm := <-stageBroker.Call("alarm.bell", nil)
Expect(stageAlarm.IsError()).Should(BeTrue())
Expect(stageAlarm.Error().Error()).Should(Equal("Registry - endpoint not found for actionName: alarm.bell namespace: stage"))

stageBroker.Publish(alarmService("stage"))
stageAlarm = <-stageBroker.Call("alarm.bell", nil)
Expect(stageAlarm.IsError()).Should(BeFalse())
Expect(stageAlarm.String()).Should(Equal("stage"))

code = <-stageBroker.Call("good.code", nil)
Expect(code.IsError()).Should(BeTrue())
Expect(code.Error().Error()).Should(Equal("Registry - endpoint not found for actionName: good.code namespace: stage"))

devBroker.Stop()
stageBroker.Stop()

close(done)
}, 2)
})
})
1 change: 0 additions & 1 deletion transit/nats/stan.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ func (transporter *StanTransporter) Connect() chan error {
connection, err := stan.Connect(transporter.clusterID, transporter.clientID, stan.NatsURL(transporter.url))
if err != nil {
transporter.logger.Error("STAN Connect() - Error: ", err, " clusterID: ", transporter.clusterID, " clientID: ", transporter.clientID)
//panic("Error trying to connect to stan server. url: " + transporter.url + " clusterID: " + transporter.clusterID + " clientID: " + transporter.clientID + " -> Stan error: " + error.Error())
endChan <- err
return
}
Expand Down
17 changes: 12 additions & 5 deletions transit/pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,12 +167,19 @@ func (pubsub *PubSub) createTransport() transit.Transport {
pubsub.logger.Info("Transporter: Memory")
transport = pubsub.createMemoryTransporter()
}
transport.SetPrefix("MOL")
transport.SetPrefix(resolveNamespace(pubsub.broker.Config.Namespace))
transport.SetNodeID(pubsub.broker.LocalNode().GetID())
transport.SetSerializer(pubsub.serializer)
return transport
}

func resolveNamespace(namespace string) string {
if namespace != "" {
return "MOL-" + namespace
}
return "MOL"
}

func (pubsub *PubSub) createMemoryTransporter() transit.Transport {
pubsub.logger.Debug("createMemoryTransporter() ... ")
logger := pubsub.logger.WithField("transport", "memory")
Expand All @@ -195,18 +202,18 @@ func (pubsub *PubSub) createNatsTransporter() transit.Transport {
}

func (pubsub *PubSub) createStanTransporter() transit.Transport {
//TODO: move this to config and params
broker := pubsub.broker
logger := broker.Logger("transport", "stan")

url := "stan://" + os.Getenv("STAN_HOST") + ":4222"
clusterID := "test-cluster"

localNodeID := broker.LocalNode().GetID()
logger := broker.Logger("transport", "stan")
clientID := strings.ReplaceAll(localNodeID, ".", "_")

options := nats.StanOptions{
url,
clusterID,
localNodeID,
clientID,
logger,
pubsub.serializer,
func(message moleculer.Payload) bool {
Expand Down

0 comments on commit ec63868

Please sign in to comment.