From f41d9323e374466e5e1fb3bd668c70142c2734cf Mon Sep 17 00:00:00 2001 From: Rafael Date: Tue, 14 Apr 2020 12:36:06 +1200 Subject: [PATCH 1/2] feat (namespace) registry, broker and pubsub --- broker/broker.go | 4 ++ registry/registry.go | 10 ++++- registry/registry_test.go | 92 +++++++++++++++++++++++++++++++++++++++ transit/pubsub/pubsub.go | 9 +++- 4 files changed, 112 insertions(+), 3 deletions(-) diff --git a/broker/broker.go b/broker/broker.go index 2f5e7536..547f3fe5 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -78,6 +78,10 @@ func mergeConfigs(baseConfig moleculer.Config, userConfig []*moleculer.Config) m if config.RequestTimeout != 0 { baseConfig.RequestTimeout = config.RequestTimeout } + + if config.Namespace != "" { + baseConfig.Namespace = config.Namespace + } } } return baseConfig diff --git a/registry/registry.go b/registry/registry.go index 493efde5..d10da431 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -39,6 +39,7 @@ type ServiceRegistry struct { offlineCheckFrequency time.Duration offlineTimeout time.Duration nodeReceivedMutex *sync.Mutex + namespace string } // createTransit create a transit instance based on the config. @@ -80,6 +81,7 @@ func CreateRegistry(nodeID string, broker *moleculer.BrokerDelegates) *ServiceRe offlineTimeout: config.OfflineTimeout, stopping: false, nodeReceivedMutex: &sync.Mutex{}, + namespace: config.Namespace, } registry.logger.Debug("Service Registry created for broker: ", nodeID) @@ -256,11 +258,15 @@ func (registry *ServiceRegistry) BroadcastEvent(context moleculer.BrokerContext) func (registry *ServiceRegistry) LoadBalanceCall(context moleculer.BrokerContext, opts ...moleculer.Options) chan moleculer.Payload { actionName := context.ActionName() params := context.Payload() - registry.logger.Trace("LoadBalanceCall() - actionName: ", actionName, " params: ", params, " opts: ", opts) + + registry.logger.Trace("LoadBalanceCall() - actionName: ", actionName, " params: ", params, " namespace: ", registry.namespace, " opts: ", opts) actionEntry := registry.nextAction(actionName, registry.strategy, opts...) if actionEntry == nil { - msg := fmt.Sprint("Registry - endpoint not found for actionName: ", actionName) + msg := "Registry - endpoint not found for actionName: " + actionName + if registry.namespace != "" { + msg = msg + " namespace: " + registry.namespace + } registry.logger.Error(msg) resultChan := make(chan moleculer.Payload, 1) resultChan <- payload.Error(msg) diff --git a/registry/registry_test.go b/registry/registry_test.go index 24a97175..71a6f25a 100644 --- a/registry/registry_test.go +++ b/registry/registry_test.go @@ -265,4 +265,96 @@ var _ = Describe("Registry", func() { close(done) }, 3) }) + + Describe("Namespace", func() { + + It("Services across namespaces cannos see each other", func(done Done) { + + mem := &memory.SharedMemory{} + + devBroker := broker.New(&moleculer.Config{ + DiscoverNodeID: func() string { return "node1_devBroker" }, + LogLevel: logLevel, + Namespace: "dev", + TransporterFactory: func() interface{} { + transport := memory.Create(log.WithField("transport", "memory"), mem) + return &transport + }, + }) + + stageBroker := broker.New(&moleculer.Config{ + DiscoverNodeID: func() string { return "node1_stageBroker" }, + LogLevel: logLevel, + Namespace: "stage", + TransporterFactory: func() interface{} { + transport := memory.Create(log.WithField("transport", "memory"), mem) + return &transport + }, + }) + + //alarm service - prints the alarm and return the namespace :) + alarmService := func(namemspace string) moleculer.ServiceSchema { + return moleculer.ServiceSchema{ + Name: "alarm", + Actions: []moleculer.Action{ + { + Name: "bell", + Handler: func(context moleculer.Context, params moleculer.Payload) interface{} { + context.Logger().Info("alarm.bell ringing !!! namemspace: ", namemspace) + return namemspace + }, + }, + }, + } + } + + devOnlyService := moleculer.ServiceSchema{ + Name: "good", + Actions: []moleculer.Action{ + { + Name: "code", + Handler: func(context moleculer.Context, params moleculer.Payload) interface{} { + context.Logger().Info("nice code :)") + return "🧠" + }, + }, + }, + } + + devBroker.Publish(alarmService("dev")) + devBroker.Publish(devOnlyService) + devBroker.Start() + + stageBroker.Start() + + devAlarm := <-devBroker.Call("alarm.bell", nil) + Expect(devAlarm.IsError()).Should(BeFalse()) + Expect(devAlarm.String()).Should(Equal("dev")) + + code := <-devBroker.Call("good.code", nil) + Expect(code.IsError()).Should(BeFalse()) + Expect(code.String()).Should(Equal("🧠")) + + time.Sleep(time.Millisecond) + + //alarm.bell should not be accessible to the stage broker + stageAlarm := <-stageBroker.Call("alarm.bell", nil) + Expect(stageAlarm.IsError()).Should(BeTrue()) + Expect(stageAlarm.Error().Error()).Should(Equal("Registry - endpoint not found for actionName: alarm.bell namespace: stage")) + + stageBroker.Publish(alarmService("stage")) + stageAlarm = <-stageBroker.Call("alarm.bell", nil) + Expect(stageAlarm.IsError()).Should(BeFalse()) + Expect(stageAlarm.String()).Should(Equal("stage")) + + code = <-stageBroker.Call("good.code", nil) + Expect(code.IsError()).Should(BeTrue()) + Expect(code.Error().Error()).Should(Equal("Registry - endpoint not found for actionName: good.code namespace: stage")) + + devBroker.Stop() + stageBroker.Stop() + + close(done) + }, 2) + }) }) diff --git a/transit/pubsub/pubsub.go b/transit/pubsub/pubsub.go index 70e9165c..f78e4ca4 100644 --- a/transit/pubsub/pubsub.go +++ b/transit/pubsub/pubsub.go @@ -167,12 +167,19 @@ func (pubsub *PubSub) createTransport() transit.Transport { pubsub.logger.Info("Transporter: Memory") transport = pubsub.createMemoryTransporter() } - transport.SetPrefix("MOL") + transport.SetPrefix(resolveNamespace(pubsub.broker.Config.Namespace)) transport.SetNodeID(pubsub.broker.LocalNode().GetID()) transport.SetSerializer(pubsub.serializer) return transport } +func resolveNamespace(namespace string) string { + if namespace != "" { + return "MOL-" + namespace + } + return "MOL" +} + func (pubsub *PubSub) createMemoryTransporter() transit.Transport { pubsub.logger.Debug("createMemoryTransporter() ... ") logger := pubsub.logger.WithField("transport", "memory") From c39ffcbcb060cbc482e8961c7b2658691d5d4455 Mon Sep 17 00:00:00 2001 From: Rafael Date: Tue, 14 Apr 2020 14:04:38 +1200 Subject: [PATCH 2/2] fix (pubsub) clean up stan clientID before connection --- transit/nats/stan.go | 1 - transit/pubsub/pubsub.go | 8 ++++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/transit/nats/stan.go b/transit/nats/stan.go index 9501263d..6fb16565 100644 --- a/transit/nats/stan.go +++ b/transit/nats/stan.go @@ -53,7 +53,6 @@ func (transporter *StanTransporter) Connect() chan error { connection, err := stan.Connect(transporter.clusterID, transporter.clientID, stan.NatsURL(transporter.url)) if err != nil { transporter.logger.Error("STAN Connect() - Error: ", err, " clusterID: ", transporter.clusterID, " clientID: ", transporter.clientID) - //panic("Error trying to connect to stan server. url: " + transporter.url + " clusterID: " + transporter.clusterID + " clientID: " + transporter.clientID + " -> Stan error: " + error.Error()) endChan <- err return } diff --git a/transit/pubsub/pubsub.go b/transit/pubsub/pubsub.go index f78e4ca4..944b2eda 100644 --- a/transit/pubsub/pubsub.go +++ b/transit/pubsub/pubsub.go @@ -202,18 +202,18 @@ func (pubsub *PubSub) createNatsTransporter() transit.Transport { } func (pubsub *PubSub) createStanTransporter() transit.Transport { - //TODO: move this to config and params broker := pubsub.broker + logger := broker.Logger("transport", "stan") + url := "stan://" + os.Getenv("STAN_HOST") + ":4222" clusterID := "test-cluster" - localNodeID := broker.LocalNode().GetID() - logger := broker.Logger("transport", "stan") + clientID := strings.ReplaceAll(localNodeID, ".", "_") options := nats.StanOptions{ url, clusterID, - localNodeID, + clientID, logger, pubsub.serializer, func(message moleculer.Payload) bool {