diff --git a/.travis.yml b/.travis.yml index 22245533b..be540c413 100644 --- a/.travis.yml +++ b/.travis.yml @@ -12,4 +12,4 @@ install: - go get -v ./... - go build -v ./... -script: go test -gocheck.v -v ./... \ No newline at end of file +script: go test -v ./... diff --git a/access_logger.go b/access_logger.go index 3fb7817fd..f969f8c0d 100644 --- a/access_logger.go +++ b/access_logger.go @@ -7,12 +7,14 @@ import ( "net/http" "os" "time" + + "github.com/cloudfoundry/gorouter/route" ) type AccessLogRecord struct { Request *http.Request Response *http.Response - RouteEndpoint *RouteEndpoint + RouteEndpoint *route.Endpoint StartedAt time.Time FirstByteAt time.Time FinishedAt time.Time diff --git a/access_logger_test.go b/access_logger_test.go index 26ee16e72..92bdf2644 100644 --- a/access_logger_test.go +++ b/access_logger_test.go @@ -7,6 +7,8 @@ import ( "net/url" "regexp" "time" + + "github.com/cloudfoundry/gorouter/route" ) type AccessLoggerSuite struct{} @@ -35,7 +37,7 @@ func (s *AccessLoggerSuite) CreateAccessLogRecord() *AccessLogRecord { StatusCode: http.StatusOK, } - b := &RouteEndpoint{ + b := &route.Endpoint{ ApplicationId: "my_awesome_id", Host: "127.0.0.1", Port: 4567, diff --git a/config.go b/config/config.go similarity index 99% rename from config.go rename to config/config.go index 78fdd8d63..3109afa48 100644 --- a/config.go +++ b/config/config.go @@ -1,4 +1,4 @@ -package router +package config import ( vcap "github.com/cloudfoundry/gorouter/common" diff --git a/config_test.go b/config/config_test.go similarity index 99% rename from config_test.go rename to config/config_test.go index 54b4c7148..729b930c0 100644 --- a/config_test.go +++ b/config/config_test.go @@ -1,4 +1,4 @@ -package router +package config import ( . "launchpad.net/gocheck" diff --git a/config/init_test.go b/config/init_test.go new file mode 100644 index 000000000..359599963 --- /dev/null +++ b/config/init_test.go @@ -0,0 +1,8 @@ +package config + +import ( + . "launchpad.net/gocheck" + "testing" +) + +func Test(t *testing.T) { TestingT(t) } diff --git a/config/example.yml b/example_config/example.yml similarity index 100% rename from config/example.yml rename to example_config/example.yml diff --git a/helper_test.go b/helper_test.go index c7c4baecd..67055f340 100644 --- a/helper_test.go +++ b/helper_test.go @@ -3,13 +3,16 @@ package router import ( "errors" "fmt" - steno "github.com/cloudfoundry/gosteno" - . "launchpad.net/gocheck" "net" "os/exec" "strconv" "testing" "time" + + steno "github.com/cloudfoundry/gosteno" + . "launchpad.net/gocheck" + + "github.com/cloudfoundry/gorouter/config" ) func Test(t *testing.T) { @@ -21,13 +24,14 @@ func Test(t *testing.T) { steno.Init(config) - log = steno.NewLogger("test") + // log = steno.NewLogger("test") TestingT(t) } -func SpecConfig(natsPort, statusPort, proxyPort uint16) *Config { - c := DefaultConfig() +func SpecConfig(natsPort, statusPort, proxyPort uint16) *config.Config { + c := config.DefaultConfig() + c.Port = proxyPort c.Index = 2 c.TraceKey = "my_trace_key" @@ -41,20 +45,20 @@ func SpecConfig(natsPort, statusPort, proxyPort uint16) *Config { c.DropletStaleThreshold = 0 c.PublishActiveAppsInterval = 0 - c.Status = StatusConfig{ + c.Status = config.StatusConfig{ Port: statusPort, User: "user", Pass: "pass", } - c.Nats = NatsConfig{ + c.Nats = config.NatsConfig{ Host: "localhost", Port: natsPort, User: "nats", Pass: "nats", } - c.Logging = LoggingConfig{ + c.Logging = config.LoggingConfig{ File: "/dev/stderr", Level: "info", } diff --git a/integration_test.go b/integration_test.go index 11f1947bc..a0c2b7464 100644 --- a/integration_test.go +++ b/integration_test.go @@ -5,12 +5,15 @@ import ( "time" mbus "github.com/cloudfoundry/go_cfmessagebus" - "github.com/cloudfoundry/gorouter/test" . "launchpad.net/gocheck" + + "github.com/cloudfoundry/gorouter/config" + "github.com/cloudfoundry/gorouter/log" + "github.com/cloudfoundry/gorouter/test" ) type IntegrationSuite struct { - Config *Config + Config *config.Config mbusClient mbus.MessageBus router *Router @@ -37,9 +40,14 @@ func (s *IntegrationSuite) TestNatsConnectivity(c *C) { statusPort := nextAvailPort() s.Config = SpecConfig(s.natsPort, statusPort, proxyPort) + + // ensure the threshold is longer than the interval that we check, + // because we set the route's timestamp to time.Now() on the interval + // as part of pausing s.Config.PruneStaleDropletsInterval = 1 * time.Second + s.Config.DropletStaleThreshold = 2 * s.Config.PruneStaleDropletsInterval - SetupLoggerFromConfig(s.Config) + log.SetupLoggerFromConfig(s.Config) s.router = NewRouter(s.Config) @@ -47,13 +55,10 @@ func (s *IntegrationSuite) TestNatsConnectivity(c *C) { s.mbusClient = s.router.mbusClient - // ensure the threshold is longer than the interval that we check, - // because we set the route's timestamp to time.Now() on the interval - // as part of pausing - staleCheckInterval := s.router.registry.pruneStaleDropletsInterval - staleThreshold := 2 * staleCheckInterval + staleCheckInterval := s.Config.PruneStaleDropletsInterval + staleThreshold := s.Config.DropletStaleThreshold - s.router.registry.dropletStaleThreshold = staleThreshold + s.Config.DropletStaleThreshold = staleThreshold zombieApp := test.NewGreetApp([]string{"zombie.vcap.me"}, proxyPort, s.mbusClient, nil) zombieApp.Listen() diff --git a/log/init_test.go b/log/init_test.go new file mode 100644 index 000000000..cb1cddcec --- /dev/null +++ b/log/init_test.go @@ -0,0 +1,8 @@ +package log + +import ( + . "launchpad.net/gocheck" + "testing" +) + +func Test(t *testing.T) { TestingT(t) } diff --git a/log/logger.go b/log/logger.go new file mode 100644 index 000000000..ac657ad15 --- /dev/null +++ b/log/logger.go @@ -0,0 +1,70 @@ +package log + +import ( + "github.com/cloudfoundry/gorouter/common" + "github.com/cloudfoundry/gorouter/config" + steno "github.com/cloudfoundry/gosteno" + "os" +) + +var logger *steno.Logger + +var Counter = common.NewLogCounter() + +func init() { + stenoConfig := &steno.Config{ + Sinks: []steno.Sink{steno.NewIOSink(os.Stderr)}, + Codec: steno.NewJsonCodec(), + Level: steno.LOG_ALL, + } + + steno.Init(stenoConfig) + logger = steno.NewLogger("router.init") +} + +func SetupLoggerFromConfig(c *config.Config) { + l, err := steno.GetLogLevel(c.Logging.Level) + if err != nil { + panic(err) + } + + s := make([]steno.Sink, 0) + if c.Logging.File != "" { + s = append(s, steno.NewFileSink(c.Logging.File)) + } else { + s = append(s, steno.NewIOSink(os.Stdout)) + } + + if c.Logging.Syslog != "" { + s = append(s, steno.NewSyslogSink(c.Logging.Syslog)) + } + + s = append(s, Counter) + + stenoConfig := &steno.Config{ + Sinks: s, + Codec: steno.NewJsonCodec(), + Level: l, + } + + steno.Init(stenoConfig) + logger = steno.NewLogger("router.global") +} + +func Fatal(msg string) { logger.Fatal(msg) } +func Error(msg string) { logger.Error(msg) } +func Warn(msg string) { logger.Warn(msg) } +func Info(msg string) { logger.Info(msg) } +func Debug(msg string) { logger.Debug(msg) } + +func Fatald(data map[string]interface{}, msg string) { logger.Fatald(data, msg) } +func Errord(data map[string]interface{}, msg string) { logger.Errord(data, msg) } +func Warnd(data map[string]interface{}, msg string) { logger.Warnd(data, msg) } +func Infod(data map[string]interface{}, msg string) { logger.Infod(data, msg) } +func Debugd(data map[string]interface{}, msg string) { logger.Debugd(data, msg) } + +func Fatalf(msg string, vals ...interface{}) { logger.Fatalf(msg, vals...) } +func Errorf(msg string, vals ...interface{}) { logger.Errorf(msg, vals...) } +func Warnf(msg string, vals ...interface{}) { logger.Warnf(msg, vals...) } +func Infof(msg string, vals ...interface{}) { logger.Infof(msg, vals...) } +func Debugf(msg string, vals ...interface{}) { logger.Debugf(msg, vals...) } diff --git a/logger_test.go b/log/logger_test.go similarity index 64% rename from logger_test.go rename to log/logger_test.go index b78d133a3..6bd3acb94 100644 --- a/logger_test.go +++ b/log/logger_test.go @@ -1,8 +1,10 @@ -package router +package log import ( steno "github.com/cloudfoundry/gosteno" . "launchpad.net/gocheck" + + "github.com/cloudfoundry/gorouter/config" ) type LoggerSuite struct{} @@ -10,13 +12,13 @@ type LoggerSuite struct{} var _ = Suite(&LoggerSuite{}) func (s *LoggerSuite) TestSetupLoggerFromConfig(c *C) { - cfg := DefaultConfig() + cfg := config.DefaultConfig() cfg.Logging.File = "/tmp/gorouter.log" SetupLoggerFromConfig(cfg) - count := logCounter.GetCount("info") + count := Counter.GetCount("info") logger := steno.NewLogger("test") logger.Info("Hello") - c.Assert(logCounter.GetCount("info"), Equals, count+1) + c.Assert(Counter.GetCount("info"), Equals, count+1) } diff --git a/logger.go b/logger.go deleted file mode 100644 index fbfc1224b..000000000 --- a/logger.go +++ /dev/null @@ -1,50 +0,0 @@ -package router - -import ( - "github.com/cloudfoundry/gorouter/common" - steno "github.com/cloudfoundry/gosteno" - "os" -) - -var log *steno.Logger -var logCounter = common.NewLogCounter() - -func init() { - stenoConfig := &steno.Config{ - Sinks: []steno.Sink{steno.NewIOSink(os.Stderr)}, - Codec: steno.NewJsonCodec(), - Level: steno.LOG_ALL, - } - - steno.Init(stenoConfig) - log = steno.NewLogger("router.init") -} - -func SetupLoggerFromConfig(c *Config) { - l, err := steno.GetLogLevel(c.Logging.Level) - if err != nil { - panic(err) - } - - s := make([]steno.Sink, 0) - if c.Logging.File != "" { - s = append(s, steno.NewFileSink(c.Logging.File)) - } else { - s = append(s, steno.NewIOSink(os.Stdout)) - } - - if c.Logging.Syslog != "" { - s = append(s, steno.NewSyslogSink(c.Logging.Syslog)) - } - - s = append(s, logCounter) - - stenoConfig := &steno.Config{ - Sinks: s, - Codec: steno.NewJsonCodec(), - Level: l, - } - - steno.Init(stenoConfig) - log = steno.NewLogger("router.global") -} diff --git a/perf_test.go b/perf_test.go index 322262a03..6ecee5e4a 100644 --- a/perf_test.go +++ b/perf_test.go @@ -4,6 +4,10 @@ import ( "github.com/cloudfoundry/go_cfmessagebus/mock_cfmessagebus" "strconv" "testing" + + "github.com/cloudfoundry/gorouter/config" + "github.com/cloudfoundry/gorouter/registry" + "github.com/cloudfoundry/gorouter/route" ) const ( @@ -12,17 +16,17 @@ const ( ) func BenchmarkRegister(b *testing.B) { - c := DefaultConfig() + c := config.DefaultConfig() mbus := mock_cfmessagebus.NewMockMessageBus() - r := NewRegistry(c, mbus) + r := registry.NewRegistry(c, mbus) p := NewProxy(c, r, NewVarz(r)) for i := 0; i < b.N; i++ { str := strconv.Itoa(i) - p.Register(&RouteEndpoint{ + p.Register(&route.Endpoint{ Host: "localhost", Port: uint16(i), - Uris: []Uri{Uri("bench.vcap.me." + str)}, + Uris: []route.Uri{route.Uri("bench.vcap.me." + str)}, }) } } diff --git a/proxy.go b/proxy.go index a922b5223..4459771a8 100644 --- a/proxy.go +++ b/proxy.go @@ -3,7 +3,6 @@ package router import ( "bufio" "fmt" - steno "github.com/cloudfoundry/gosteno" "io" "net" "net/http" @@ -11,6 +10,12 @@ import ( "strings" "sync" "time" + + steno "github.com/cloudfoundry/gosteno" + + "github.com/cloudfoundry/gorouter/config" + "github.com/cloudfoundry/gorouter/registry" + "github.com/cloudfoundry/gorouter/route" ) const ( @@ -26,8 +31,8 @@ const ( type Proxy struct { sync.RWMutex *steno.Logger - *Config - *Registry + *config.Config + *registry.Registry Varz *AccessLogger } @@ -65,7 +70,7 @@ func (rw *responseWriter) CopyFrom(src io.Reader) (int64, error) { return io.Copy(dst, src) } -func NewProxy(c *Config, r *Registry, v Varz) *Proxy { +func NewProxy(c *config.Config, r *registry.Registry, v Varz) *Proxy { p := &Proxy{ Config: c, Logger: steno.NewLogger("router.proxy"), @@ -98,7 +103,7 @@ func hostWithoutPort(req *http.Request) string { return host } -func (proxy *Proxy) Lookup(request *http.Request) (*RouteEndpoint, bool) { +func (proxy *Proxy) Lookup(request *http.Request) (*route.Endpoint, bool) { host := hostWithoutPort(request) // Try choosing a backend using sticky session diff --git a/proxy_test.go b/proxy_test.go index 7c9b9a5a8..b43bc2ecf 100644 --- a/proxy_test.go +++ b/proxy_test.go @@ -13,6 +13,10 @@ import ( "github.com/cloudfoundry/go_cfmessagebus/mock_cfmessagebus" . "launchpad.net/gocheck" + + "github.com/cloudfoundry/gorouter/config" + "github.com/cloudfoundry/gorouter/registry" + "github.com/cloudfoundry/gorouter/route" ) type connHandler func(*conn) @@ -21,9 +25,9 @@ type nullVarz struct{} func (_ nullVarz) MarshalJSON() ([]byte, error) { return json.Marshal(nil) } -func (_ nullVarz) CaptureBadRequest(req *http.Request) {} -func (_ nullVarz) CaptureRoutingRequest(b *RouteEndpoint, req *http.Request) {} -func (_ nullVarz) CaptureRoutingResponse(b *RouteEndpoint, res *http.Response, d time.Duration) {} +func (_ nullVarz) CaptureBadRequest(req *http.Request) {} +func (_ nullVarz) CaptureRoutingRequest(b *route.Endpoint, req *http.Request) {} +func (_ nullVarz) CaptureRoutingResponse(b *route.Endpoint, res *http.Response, d time.Duration) {} type conn struct { net.Conn @@ -119,7 +123,7 @@ func (x *conn) WriteLines(lines []string) { } type ProxySuite struct { - r *Registry + r *registry.Registry p *Proxy proxyServer net.Listener @@ -131,11 +135,11 @@ type ProxySuite struct { var _ = Suite(&ProxySuite{}) func (s *ProxySuite) SetUpTest(c *C) { - x := DefaultConfig() + x := config.DefaultConfig() x.TraceKey = "my_trace_key" mbus := mock_cfmessagebus.NewMockMessageBus() - s.r = NewRegistry(x, mbus) + s.r = registry.NewRegistry(x, mbus) s.p = NewProxy(x, s.r, nullVarz{}) ln, err := net.Listen("tcp", "127.0.0.1:0") @@ -163,10 +167,10 @@ func (s *ProxySuite) registerAddr(u string, a net.Addr) { panic(err) } - s.r.Register(&RouteEndpoint{ + s.r.Register(&route.Endpoint{ Host: h, Port: uint16(x), - Uris: []Uri{Uri(u)}, + Uris: []route.Uri{route.Uri(u)}, }) } diff --git a/registry/init_test.go b/registry/init_test.go new file mode 100644 index 000000000..0c3e9fe74 --- /dev/null +++ b/registry/init_test.go @@ -0,0 +1,8 @@ +package registry + +import ( + . "launchpad.net/gocheck" + "testing" +) + +func Test(t *testing.T) { TestingT(t) } diff --git a/registry.go b/registry/registry.go similarity index 57% rename from registry.go rename to registry/registry.go index d652ef8ad..474722330 100644 --- a/registry.go +++ b/registry/registry.go @@ -1,4 +1,4 @@ -package router +package registry import ( "encoding/json" @@ -9,6 +9,10 @@ import ( "github.com/cloudfoundry/gorouter/stats" "github.com/cloudfoundry/gorouter/util" steno "github.com/cloudfoundry/gosteno" + + "github.com/cloudfoundry/gorouter/config" + "github.com/cloudfoundry/gorouter/log" + "github.com/cloudfoundry/gorouter/route" ) type Registry struct { @@ -19,8 +23,8 @@ type Registry struct { *stats.ActiveApps *stats.TopApps - byUri map[Uri]*EndpointPool - byAddr map[string]*RouteEndpoint + byUri map[route.Uri]*route.Pool + byAddr map[string]*route.Endpoint staleTracker *util.ListMap @@ -32,28 +36,28 @@ type Registry struct { timeOfLastUpdate time.Time } -func NewRegistry(c *Config, messageBusClient mbus.MessageBus) *Registry { - r := &Registry{ - messageBus: messageBusClient, - } +func NewRegistry(c *config.Config, mbus mbus.MessageBus) *Registry { + r := &Registry{} r.Logger = steno.NewLogger("router.registry") r.ActiveApps = stats.NewActiveApps() r.TopApps = stats.NewTopApps() - r.byUri = make(map[Uri]*EndpointPool) - r.byAddr = make(map[string]*RouteEndpoint) + r.byUri = make(map[route.Uri]*route.Pool) + r.byAddr = make(map[string]*route.Endpoint) r.staleTracker = util.NewListMap() r.pruneStaleDropletsInterval = c.PruneStaleDropletsInterval r.dropletStaleThreshold = c.DropletStaleThreshold + r.messageBus = mbus + return r } -func (registry *Registry) Register(endpoint *RouteEndpoint) { +func (registry *Registry) Register(endpoint *route.Endpoint) { if len(endpoint.Uris) == 0 { return } @@ -63,29 +67,33 @@ func (registry *Registry) Register(endpoint *RouteEndpoint) { addr := endpoint.CanonicalAddr() - routeEndpoint, found := registry.byAddr[addr] - if !found { + endpointToRegister, found := registry.byAddr[addr] + if found { + for _, uri := range endpoint.Uris { + endpointToRegister.Register(uri) + } + } else { registry.byAddr[addr] = endpoint - routeEndpoint = endpoint + endpointToRegister = endpoint } - for _, uri := range endpoint.Uris { + for _, uri := range endpointToRegister.Uris { pool, found := registry.byUri[uri.ToLower()] if !found { - pool = NewEndpointPool() + pool = route.NewPool() registry.byUri[uri.ToLower()] = pool } - pool.Add(endpoint) + pool.Add(endpointToRegister) } - routeEndpoint.updated_at = time.Now() + endpointToRegister.UpdatedAtFORNOW = time.Now() - registry.staleTracker.PushBack(routeEndpoint) + registry.staleTracker.PushBack(endpointToRegister) registry.timeOfLastUpdate = time.Now() } -func (registry *Registry) Unregister(endpoint *RouteEndpoint) { +func (registry *Registry) Unregister(endpoint *route.Endpoint) { registry.Lock() defer registry.Unlock() @@ -101,11 +109,11 @@ func (registry *Registry) Unregister(endpoint *RouteEndpoint) { } } -func (r *Registry) Lookup(host string) (*RouteEndpoint, bool) { +func (r *Registry) Lookup(host string) (*route.Endpoint, bool) { r.RLock() defer r.RUnlock() - x, ok := r.byUri[Uri(host).ToLower()] + x, ok := r.byUri[route.Uri(host).ToLower()] if !ok { return nil, false } @@ -117,8 +125,8 @@ func (registry *Registry) StartPruningCycle() { go registry.checkAndPrune() } -func (registry *Registry) IsStale(routeEndpoint *RouteEndpoint) bool { - return routeEndpoint.updated_at.Add(registry.dropletStaleThreshold).Before(time.Now()) +func (registry *Registry) IsStale(endpoint *route.Endpoint) bool { + return endpoint.UpdatedAtFORNOW.Add(registry.dropletStaleThreshold).Before(time.Now()) } func (registry *Registry) PruneStaleDroplets() { @@ -134,11 +142,11 @@ func (registry *Registry) PruneStaleDroplets() { registry.pruneStaleDroplets() } -func (r *Registry) LookupByPrivateInstanceId(host string, p string) (*RouteEndpoint, bool) { +func (r *Registry) LookupByPrivateInstanceId(host string, p string) (*route.Endpoint, bool) { r.RLock() defer r.RUnlock() - x, ok := r.byUri[Uri(host).ToLower()] + x, ok := r.byUri[route.Uri(host).ToLower()] if !ok { return nil, false } @@ -146,7 +154,7 @@ func (r *Registry) LookupByPrivateInstanceId(host string, p string) (*RouteEndpo return x.FindByPrivateInstanceId(p) } -func (r *Registry) CaptureRoutingRequest(x *RouteEndpoint, t time.Time) { +func (r *Registry) CaptureRoutingRequest(x *route.Endpoint, t time.Time) { if x.ApplicationId != "" { r.ActiveApps.Mark(x.ApplicationId, t) r.TopApps.Mark(x.ApplicationId, t) @@ -160,7 +168,11 @@ func (registry *Registry) NumUris() int { return len(registry.byUri) } -func (r *Registry) NumRouteEndpoints() int { +func (r *Registry) TimeOfLastUpdate() time.Time { + return r.timeOfLastUpdate +} + +func (r *Registry) NumEndpoints() int { r.RLock() defer r.RUnlock() @@ -180,24 +192,24 @@ func (registry *Registry) isStateStale() bool { func (registry *Registry) pruneStaleDroplets() { for registry.staleTracker.Len() > 0 { - routeEndpoint := registry.staleTracker.Front().(*RouteEndpoint) - if !registry.IsStale(routeEndpoint) { - log.Infof("Droplet is not stale; NOT pruning: %v", routeEndpoint.CanonicalAddr()) + endpoint := registry.staleTracker.Front().(*route.Endpoint) + if !registry.IsStale(endpoint) { + log.Infof("Droplet is not stale; NOT pruning: %v", endpoint.CanonicalAddr()) break } - log.Infof("Pruning stale droplet: %v", routeEndpoint.CanonicalAddr()) + log.Infof("Pruning stale droplet: %v", endpoint.CanonicalAddr()) - for _, uri := range routeEndpoint.Uris { - log.Infof("Pruning stale droplet: %v, uri: %s", routeEndpoint.CanonicalAddr(), uri) - registry.unregisterUri(routeEndpoint, uri) + for _, uri := range endpoint.Uris { + log.Infof("Pruning stale droplet: %v, uri: %s", endpoint.CanonicalAddr(), uri) + registry.unregisterUri(endpoint, uri) } } } func (registry *Registry) pauseStaleTracker() { for routeElement := registry.staleTracker.FrontElement(); routeElement != nil; routeElement = routeElement.Next() { - routeElement.Value.(*RouteEndpoint).updated_at = time.Now() + routeElement.Value.(*route.Endpoint).UpdatedAtFORNOW = time.Now() } } @@ -216,23 +228,23 @@ func (r *Registry) checkAndPrune() { } } -func (registry *Registry) unregisterUri(routeEndpoint *RouteEndpoint, uri Uri) { +func (registry *Registry) unregisterUri(endpoint *route.Endpoint, uri route.Uri) { uri = uri.ToLower() - ok := routeEndpoint.unregister(uri) + ok := endpoint.Unregister(uri) if ok { - routeEndpoints := registry.byUri[uri] + endpoints := registry.byUri[uri] - routeEndpoints.Remove(routeEndpoint) + endpoints.Remove(endpoint) - if routeEndpoints.IsEmpty() { + if endpoints.IsEmpty() { delete(registry.byUri, uri) } } // Remove backend if it no longer has uris - if len(routeEndpoint.Uris) == 0 { - delete(registry.byAddr, routeEndpoint.CanonicalAddr()) - registry.staleTracker.Delete(routeEndpoint) + if len(endpoint.Uris) == 0 { + delete(registry.byAddr, endpoint.CanonicalAddr()) + registry.staleTracker.Delete(endpoint) } } diff --git a/registry_test.go b/registry/registry_test.go similarity index 75% rename from registry_test.go rename to registry/registry_test.go index 96caba595..405d8788e 100644 --- a/registry_test.go +++ b/registry/registry_test.go @@ -1,11 +1,14 @@ -package router +package registry import ( + "encoding/json" "time" - "encoding/json" "github.com/cloudfoundry/go_cfmessagebus/mock_cfmessagebus" . "launchpad.net/gocheck" + + "github.com/cloudfoundry/gorouter/config" + "github.com/cloudfoundry/gorouter/route" ) type RegistrySuite struct { @@ -16,21 +19,21 @@ type RegistrySuite struct { var _ = Suite(&RegistrySuite{}) -var fooEndpoint, barEndpoint, bar2Endpoint *RouteEndpoint +var fooEndpoint, barEndpoint, bar2Endpoint *route.Endpoint func (s *RegistrySuite) SetUpTest(c *C) { - var configObj *Config + var configObj *config.Config - configObj = DefaultConfig() + configObj = config.DefaultConfig() configObj.DropletStaleThreshold = 10 * time.Millisecond s.messageBus = mock_cfmessagebus.NewMockMessageBus() s.Registry = NewRegistry(configObj, s.messageBus) - fooEndpoint = &RouteEndpoint{ + fooEndpoint = &route.Endpoint{ Host: "192.168.1.1", Port: 1234, - Uris: []Uri{"foo.vcap.me", "fooo.vcap.me"}, + Uris: []route.Uri{"foo.vcap.me", "fooo.vcap.me"}, ApplicationId: "12345", Tags: map[string]string{ @@ -39,10 +42,10 @@ func (s *RegistrySuite) SetUpTest(c *C) { }, } - barEndpoint = &RouteEndpoint{ + barEndpoint = &route.Endpoint{ Host: "192.168.1.2", Port: 4321, - Uris: []Uri{"bar.vcap.me", "barr.vcap.me"}, + Uris: []route.Uri{"bar.vcap.me", "barr.vcap.me"}, ApplicationId: "54321", Tags: map[string]string{ @@ -51,10 +54,10 @@ func (s *RegistrySuite) SetUpTest(c *C) { }, } - bar2Endpoint = &RouteEndpoint{ + bar2Endpoint = &route.Endpoint{ Host: "192.168.1.3", Port: 1234, - Uris: []Uri{"bar.vcap.me", "barr.vcap.me"}, + Uris: []route.Uri{"bar.vcap.me", "barr.vcap.me"}, ApplicationId: "54321", Tags: map[string]string{ @@ -78,36 +81,36 @@ func (s *RegistrySuite) TestRegister(c *C) { } func (s *RegistrySuite) TestRegisterIgnoreEmpty(c *C) { - s.Register(&RouteEndpoint{}) + s.Register(&route.Endpoint{}) c.Check(s.NumUris(), Equals, 0) - c.Check(s.NumRouteEndpoints(), Equals, 0) + c.Check(s.NumEndpoints(), Equals, 0) } func (s *RegistrySuite) TestRegisterIgnoreDuplicates(c *C) { s.Register(barEndpoint) c.Check(s.NumUris(), Equals, 2) - c.Check(s.NumRouteEndpoints(), Equals, 1) + c.Check(s.NumEndpoints(), Equals, 1) s.Register(barEndpoint) c.Check(s.NumUris(), Equals, 2) - c.Check(s.NumRouteEndpoints(), Equals, 1) + c.Check(s.NumEndpoints(), Equals, 1) s.Unregister(barEndpoint) c.Check(s.NumUris(), Equals, 0) - c.Check(s.NumRouteEndpoints(), Equals, 0) + c.Check(s.NumEndpoints(), Equals, 0) } func (s *RegistrySuite) TestRegisterUppercase(c *C) { - m1 := &RouteEndpoint{ + m1 := &route.Endpoint{ Host: "192.168.1.1", Port: 1234, - Uris: []Uri{"foo.vcap.me"}, + Uris: []route.Uri{"foo.vcap.me"}, } - m2 := &RouteEndpoint{ + m2 := &route.Endpoint{ Host: "192.168.1.1", Port: 1235, - Uris: []Uri{"FOO.VCAP.ME"}, + Uris: []route.Uri{"FOO.VCAP.ME"}, } s.Register(m1) @@ -117,66 +120,67 @@ func (s *RegistrySuite) TestRegisterUppercase(c *C) { } func (s *RegistrySuite) TestRegisterDoesntReplace(c *C) { - m1 := &RouteEndpoint{ + m1 := &route.Endpoint{ Host: "192.168.1.1", Port: 1234, - Uris: []Uri{"foo.vcap.me"}, + Uris: []route.Uri{"foo.vcap.me"}, } - m2 := &RouteEndpoint{ + m2 := &route.Endpoint{ Host: "192.168.1.1", Port: 1234, - Uris: []Uri{"bar.vcap.me"}, + Uris: []route.Uri{"bar.vcap.me"}, } s.Register(m1) s.Register(m2) c.Check(s.NumUris(), Equals, 2) + c.Check(s.NumEndpoints(), Equals, 1) } func (s *RegistrySuite) TestRegisterWithoutUris(c *C) { - m := &RouteEndpoint{ + m := &route.Endpoint{ Host: "192.168.1.1", Port: 1234, - Uris: []Uri{}, + Uris: []route.Uri{}, } s.Register(m) c.Check(s.NumUris(), Equals, 0) - c.Check(s.NumRouteEndpoints(), Equals, 0) + c.Check(s.NumEndpoints(), Equals, 0) } func (s *RegistrySuite) TestUnregister(c *C) { s.Register(barEndpoint) c.Check(s.NumUris(), Equals, 2) - c.Check(s.NumRouteEndpoints(), Equals, 1) + c.Check(s.NumEndpoints(), Equals, 1) s.Register(bar2Endpoint) c.Check(s.NumUris(), Equals, 2) - c.Check(s.NumRouteEndpoints(), Equals, 2) + c.Check(s.NumEndpoints(), Equals, 2) s.Unregister(barEndpoint) c.Check(s.NumUris(), Equals, 2) - c.Check(s.NumRouteEndpoints(), Equals, 1) + c.Check(s.NumEndpoints(), Equals, 1) s.Unregister(bar2Endpoint) c.Check(s.NumUris(), Equals, 0) - c.Check(s.NumRouteEndpoints(), Equals, 0) + c.Check(s.NumEndpoints(), Equals, 0) } func (s *RegistrySuite) TestUnregisterUppercase(c *C) { - m1 := &RouteEndpoint{ + m1 := &route.Endpoint{ Host: "192.168.1.1", Port: 1234, - Uris: []Uri{"foo.vcap.me"}, + Uris: []route.Uri{"foo.vcap.me"}, } - m2 := &RouteEndpoint{ + m2 := &route.Endpoint{ Host: "192.168.1.1", Port: 1234, - Uris: []Uri{"FOO.VCAP.ME"}, + Uris: []route.Uri{"FOO.VCAP.ME"}, } s.Register(m1) @@ -186,16 +190,16 @@ func (s *RegistrySuite) TestUnregisterUppercase(c *C) { } func (s *RegistrySuite) TestUnregisterDoesntDemolish(c *C) { - m1 := &RouteEndpoint{ + m1 := &route.Endpoint{ Host: "192.168.1.1", Port: 1234, - Uris: []Uri{"foo.vcap.me", "bar.vcap.me"}, + Uris: []route.Uri{"foo.vcap.me", "bar.vcap.me"}, } - m2 := &RouteEndpoint{ + m2 := &route.Endpoint{ Host: "192.168.1.1", Port: 1234, - Uris: []Uri{"foo.vcap.me"}, + Uris: []route.Uri{"foo.vcap.me"}, } s.Register(m1) @@ -205,15 +209,15 @@ func (s *RegistrySuite) TestUnregisterDoesntDemolish(c *C) { } func (s *RegistrySuite) TestLookup(c *C) { - m := &RouteEndpoint{ + m := &route.Endpoint{ Host: "192.168.1.1", Port: 1234, - Uris: []Uri{"foo.vcap.me"}, + Uris: []route.Uri{"foo.vcap.me"}, } s.Register(m) - var b *RouteEndpoint + var b *route.Endpoint var ok bool b, ok = s.Lookup("foo.vcap.me") @@ -226,23 +230,23 @@ func (s *RegistrySuite) TestLookup(c *C) { } func (s *RegistrySuite) TestLookupDoubleRegister(c *C) { - m1 := &RouteEndpoint{ + m1 := &route.Endpoint{ Host: "192.168.1.2", Port: 1234, - Uris: []Uri{"bar.vcap.me", "barr.vcap.me"}, + Uris: []route.Uri{"bar.vcap.me", "barr.vcap.me"}, } - m2 := &RouteEndpoint{ + m2 := &route.Endpoint{ Host: "192.168.1.2", Port: 1235, - Uris: []Uri{"bar.vcap.me", "barr.vcap.me"}, + Uris: []route.Uri{"bar.vcap.me", "barr.vcap.me"}, } s.Register(m1) s.Register(m2) c.Check(s.NumUris(), Equals, 2) - c.Check(s.NumRouteEndpoints(), Equals, 2) + c.Check(s.NumEndpoints(), Equals, 2) } func (s *RegistrySuite) TestTracker(c *C) { @@ -263,7 +267,7 @@ func (s *RegistrySuite) TestPruneStaleApps(c *C) { s.Register(fooEndpoint) s.Register(barEndpoint) c.Check(s.NumUris(), Equals, 4) - c.Check(s.NumRouteEndpoints(), Equals, 2) + c.Check(s.NumEndpoints(), Equals, 2) c.Assert(s.staleTracker.Len(), Equals, 2) time.Sleep(s.dropletStaleThreshold + 1*time.Millisecond) @@ -272,7 +276,7 @@ func (s *RegistrySuite) TestPruneStaleApps(c *C) { s.Register(bar2Endpoint) c.Check(s.NumUris(), Equals, 2) - c.Check(s.NumRouteEndpoints(), Equals, 1) + c.Check(s.NumEndpoints(), Equals, 1) c.Assert(s.staleTracker.Len(), Equals, 1) } @@ -280,7 +284,7 @@ func (s *RegistrySuite) TestPruneStaleAppsWhenStateStale(c *C) { s.Register(fooEndpoint) s.Register(barEndpoint) c.Check(s.NumUris(), Equals, 4) - c.Check(s.NumRouteEndpoints(), Equals, 2) + c.Check(s.NumEndpoints(), Equals, 2) c.Assert(s.staleTracker.Len(), Equals, 2) time.Sleep(s.dropletStaleThreshold + 1*time.Millisecond) @@ -292,7 +296,7 @@ func (s *RegistrySuite) TestPruneStaleAppsWhenStateStale(c *C) { s.PruneStaleDroplets() c.Check(s.NumUris(), Equals, 4) - c.Check(s.NumRouteEndpoints(), Equals, 2) + c.Check(s.NumEndpoints(), Equals, 2) c.Assert(s.staleTracker.Len(), Equals, 2) routeEndpoint, _ := s.Lookup("foo.vcap.me") @@ -331,10 +335,10 @@ func (s *RegistrySuite) TestPruneStaleDropletsDoesNotDeadlock(c *C) { } func (s *RegistrySuite) TestInfoMarshalling(c *C) { - m := &RouteEndpoint{ + m := &route.Endpoint{ Host: "192.168.1.1", Port: 1234, - Uris: []Uri{"foo.vcap.me"}, + Uris: []route.Uri{"foo.vcap.me"}, } s.Register(m) diff --git a/route/endpoint.go b/route/endpoint.go new file mode 100644 index 000000000..536dfd759 --- /dev/null +++ b/route/endpoint.go @@ -0,0 +1,62 @@ +package route + +import ( + "encoding/json" + "fmt" + "sync" + "time" +) + +type Endpoint struct { + sync.Mutex + + ApplicationId string + Host string + Port uint16 + Tags map[string]string + PrivateInstanceId string + + Uris Uris + + UpdatedAtFORNOW time.Time +} + +func (e *Endpoint) MarshalJSON() ([]byte, error) { + return json.Marshal(e.CanonicalAddr()) +} + +func (e *Endpoint) CanonicalAddr() string { + return fmt.Sprintf("%s:%d", e.Host, e.Port) +} + +func (e *Endpoint) ToLogData() interface{} { + return struct { + ApplicationId string + Host string + Port uint16 + Tags map[string]string + }{ + e.ApplicationId, + e.Host, + e.Port, + e.Tags, + } +} + +func (e *Endpoint) Register(uri Uri) bool { + if !e.Uris.Has(uri) { + e.Uris = append(e.Uris, uri) + return true + } + + return false +} + +func (e *Endpoint) Unregister(uri Uri) bool { + remainingUris, ok := e.Uris.Remove(uri) + if ok { + e.Uris = remainingUris + } + + return ok +} diff --git a/route/init_test.go b/route/init_test.go new file mode 100644 index 000000000..22f5a30be --- /dev/null +++ b/route/init_test.go @@ -0,0 +1,8 @@ +package route + +import ( + . "launchpad.net/gocheck" + "testing" +) + +func Test(t *testing.T) { TestingT(t) } diff --git a/endpoint_pool.go b/route/pool.go similarity index 56% rename from endpoint_pool.go rename to route/pool.go index 8a0e0846f..1986510ac 100644 --- a/endpoint_pool.go +++ b/route/pool.go @@ -1,29 +1,29 @@ -package router +package route import ( "encoding/json" "math/rand" ) -type EndpointPool struct { - endpoints map[string]*RouteEndpoint +type Pool struct { + endpoints map[string]*Endpoint } -func NewEndpointPool() *EndpointPool { - return &EndpointPool{ - endpoints: make(map[string]*RouteEndpoint), +func NewPool() *Pool { + return &Pool{ + endpoints: make(map[string]*Endpoint), } } -func (p *EndpointPool) Add(endpoint *RouteEndpoint) { +func (p *Pool) Add(endpoint *Endpoint) { p.endpoints[endpoint.CanonicalAddr()] = endpoint } -func (p *EndpointPool) Remove(endpoint *RouteEndpoint) { +func (p *Pool) Remove(endpoint *Endpoint) { delete(p.endpoints, endpoint.CanonicalAddr()) } -func (p *EndpointPool) Sample() (*RouteEndpoint, bool) { +func (p *Pool) Sample() (*Endpoint, bool) { if len(p.endpoints) == 0 { return nil, false } @@ -42,7 +42,7 @@ func (p *EndpointPool) Sample() (*RouteEndpoint, bool) { panic("unreachable") } -func (p *EndpointPool) FindByPrivateInstanceId(id string) (*RouteEndpoint, bool) { +func (p *Pool) FindByPrivateInstanceId(id string) (*Endpoint, bool) { for _, endpoint := range p.endpoints { if endpoint.PrivateInstanceId == id { return endpoint, true @@ -52,11 +52,11 @@ func (p *EndpointPool) FindByPrivateInstanceId(id string) (*RouteEndpoint, bool) return nil, false } -func (p *EndpointPool) IsEmpty() bool { +func (p *Pool) IsEmpty() bool { return len(p.endpoints) == 0 } -func (p *EndpointPool) MarshalJSON() ([]byte, error) { +func (p *Pool) MarshalJSON() ([]byte, error) { addresses := []string{} for addr, _ := range p.endpoints { diff --git a/endpoint_pool_test.go b/route/pool_test.go similarity index 56% rename from endpoint_pool_test.go rename to route/pool_test.go index 83c0a8a95..dc3020e7c 100644 --- a/endpoint_pool_test.go +++ b/route/pool_test.go @@ -1,20 +1,20 @@ -package router +package route import ( . "launchpad.net/gocheck" "math" ) -type EPSuite struct{} +type PSuite struct{} func init() { - Suite(&EPSuite{}) + Suite(&PSuite{}) } -func (s *EPSuite) TestEndpointPoolAddingAndRemoving(c *C) { - pool := NewEndpointPool() +func (s *PSuite) TestPoolAddingAndRemoving(c *C) { + pool := NewPool() - endpoint := &RouteEndpoint{} + endpoint := &Endpoint{} pool.Add(endpoint) @@ -28,10 +28,10 @@ func (s *EPSuite) TestEndpointPoolAddingAndRemoving(c *C) { c.Assert(found, Equals, false) } -func (s *EPSuite) TestEndpointPoolAddingDoesNotDuplicate(c *C) { - pool := NewEndpointPool() +func (s *PSuite) TestPoolAddingDoesNotDuplicate(c *C) { + pool := NewPool() - endpoint := &RouteEndpoint{} + endpoint := &Endpoint{} pool.Add(endpoint) pool.Add(endpoint) @@ -46,11 +46,11 @@ func (s *EPSuite) TestEndpointPoolAddingDoesNotDuplicate(c *C) { c.Assert(found, Equals, false) } -func (s *EPSuite) TestEndpointPoolAddingEquivalentEndpointsDoesNotDuplicate(c *C) { - pool := NewEndpointPool() +func (s *PSuite) TestPoolAddingEquivalentEndpointsDoesNotDuplicate(c *C) { + pool := NewPool() - endpoint1 := &RouteEndpoint{Host: "1.2.3.4", Port: 5678} - endpoint2 := &RouteEndpoint{Host: "1.2.3.4", Port: 5678} + endpoint1 := &Endpoint{Host: "1.2.3.4", Port: 5678} + endpoint2 := &Endpoint{Host: "1.2.3.4", Port: 5678} pool.Add(endpoint1) pool.Add(endpoint2) @@ -64,14 +64,14 @@ func (s *EPSuite) TestEndpointPoolAddingEquivalentEndpointsDoesNotDuplicate(c *C c.Assert(found, Equals, false) } -func (s *EPSuite) TestEndpointPoolIsEmptyInitially(c *C) { - c.Assert(NewEndpointPool().IsEmpty(), Equals, true) +func (s *PSuite) TestPoolIsEmptyInitially(c *C) { + c.Assert(NewPool().IsEmpty(), Equals, true) } -func (s *EPSuite) TestEndpointPoolIsEmptyAfterRemovingEverything(c *C) { - pool := NewEndpointPool() +func (s *PSuite) TestPoolIsEmptyAfterRemovingEverything(c *C) { + pool := NewPool() - endpoint := &RouteEndpoint{} + endpoint := &Endpoint{} pool.Add(endpoint) @@ -82,11 +82,11 @@ func (s *EPSuite) TestEndpointPoolIsEmptyAfterRemovingEverything(c *C) { c.Assert(pool.IsEmpty(), Equals, true) } -func (s *EPSuite) TestEndpointPoolFindByPrivateInstanceId(c *C) { - pool := NewEndpointPool() +func (s *PSuite) TestPoolFindByPrivateInstanceId(c *C) { + pool := NewPool() - endpointFoo := &RouteEndpoint{Host: "1.2.3.4", Port: 1234, PrivateInstanceId: "foo"} - endpointBar := &RouteEndpoint{Host: "5.6.7.8", Port: 5678, PrivateInstanceId: "bar"} + endpointFoo := &Endpoint{Host: "1.2.3.4", Port: 1234, PrivateInstanceId: "foo"} + endpointBar := &Endpoint{Host: "5.6.7.8", Port: 5678, PrivateInstanceId: "bar"} pool.Add(endpointFoo) pool.Add(endpointBar) @@ -103,11 +103,11 @@ func (s *EPSuite) TestEndpointPoolFindByPrivateInstanceId(c *C) { c.Assert(found, Equals, false) } -func (s *EPSuite) TestEndpointPoolSamplingIsRandomIsh(c *C) { - pool := NewEndpointPool() +func (s *PSuite) TestPoolSamplingIsRandomish(c *C) { + pool := NewPool() - endpoint1 := &RouteEndpoint{Host: "1.2.3.4", Port: 5678} - endpoint2 := &RouteEndpoint{Host: "5.6.7.8", Port: 1234} + endpoint1 := &Endpoint{Host: "1.2.3.4", Port: 5678} + endpoint2 := &Endpoint{Host: "5.6.7.8", Port: 1234} pool.Add(endpoint1) pool.Add(endpoint2) @@ -130,10 +130,10 @@ func (s *EPSuite) TestEndpointPoolSamplingIsRandomIsh(c *C) { c.Assert(math.Abs(float64(occurrences1-occurrences2)) < 50, Equals, true) } -func (s *EPSuite) TestEndpointPoolMarshalsAsJSON(c *C) { - pool := NewEndpointPool() +func (s *PSuite) TestPoolMarshalsAsJSON(c *C) { + pool := NewPool() - pool.Add(&RouteEndpoint{Host: "1.2.3.4", Port: 5678}) + pool.Add(&Endpoint{Host: "1.2.3.4", Port: 5678}) json, err := pool.MarshalJSON() c.Assert(err, IsNil) diff --git a/uris.go b/route/uris.go similarity index 97% rename from uris.go rename to route/uris.go index 0ea14bafe..47016a7ab 100644 --- a/uris.go +++ b/route/uris.go @@ -1,4 +1,4 @@ -package router +package route import ( "strings" diff --git a/route_endpoint.go b/route_endpoint.go deleted file mode 100644 index d1920c787..000000000 --- a/route_endpoint.go +++ /dev/null @@ -1,76 +0,0 @@ -package router - -import ( - "encoding/json" - "fmt" - "sync" - "time" -) - -type RouteEndpoint struct { - sync.Mutex - - ApplicationId string - Host string - Port uint16 - Tags map[string]string - PrivateInstanceId string - - Uris Uris - updated_at time.Time -} - -func (routeEndpoint *RouteEndpoint) MarshalJSON() ([]byte, error) { - return json.Marshal(routeEndpoint.CanonicalAddr()) -} - -func newRouteEndpoint(message *registryMessage) *RouteEndpoint { - b := &RouteEndpoint{ - ApplicationId: message.App, - Host: message.Host, - Port: message.Port, - Tags: message.Tags, - PrivateInstanceId: message.PrivateInstanceId, - - Uris: make([]Uri, 0), - updated_at: time.Now(), - } - - return b -} - -func (routeEndpoint *RouteEndpoint) CanonicalAddr() string { - return fmt.Sprintf("%s:%d", routeEndpoint.Host, routeEndpoint.Port) -} - -func (routeEndpoint *RouteEndpoint) ToLogData() interface{} { - return struct { - ApplicationId string - Host string - Port uint16 - Tags map[string]string - }{ - routeEndpoint.ApplicationId, - routeEndpoint.Host, - routeEndpoint.Port, - routeEndpoint.Tags, - } -} - -func (routeEndpoint *RouteEndpoint) register(uri Uri) bool { - if !routeEndpoint.Uris.Has(uri) { - routeEndpoint.Uris = append(routeEndpoint.Uris, uri) - return true - } - - return false -} - -func (routeEndpoint *RouteEndpoint) unregister(uri Uri) bool { - remainingUris, ok := routeEndpoint.Uris.Remove(uri) - if ok { - routeEndpoint.Uris = remainingUris - } - - return ok -} diff --git a/route_endpoint_test.go b/route_endpoint_test.go deleted file mode 100644 index 7ef135b39..000000000 --- a/route_endpoint_test.go +++ /dev/null @@ -1 +0,0 @@ -package router diff --git a/router.go b/router.go index b48c40e6e..7f3ae9689 100644 --- a/router.go +++ b/router.go @@ -5,26 +5,30 @@ import ( "compress/zlib" "encoding/json" "fmt" + "net" + "runtime" + "time" + mbus "github.com/cloudfoundry/go_cfmessagebus" vcap "github.com/cloudfoundry/gorouter/common" + "github.com/cloudfoundry/gorouter/config" + "github.com/cloudfoundry/gorouter/log" "github.com/cloudfoundry/gorouter/proxy" + "github.com/cloudfoundry/gorouter/registry" + "github.com/cloudfoundry/gorouter/route" "github.com/cloudfoundry/gorouter/util" - steno "github.com/cloudfoundry/gosteno" - "net" - "runtime" - "time" ) type Router struct { - config *Config + config *config.Config proxy *Proxy mbusClient mbus.MessageBus - registry *Registry + registry *registry.Registry varz Varz component *vcap.VcapComponent } -func NewRouter(c *Config) *Router { +func NewRouter(c *config.Config) *Router { router := &Router{ config: c, } @@ -36,7 +40,7 @@ func NewRouter(c *Config) *Router { router.establishMBus() - router.registry = NewRegistry(router.config, router.mbusClient) + router.registry = registry.NewRegistry(router.config, router.mbusClient) router.registry.StartPruningCycle() router.varz = NewVarz(router.registry) @@ -50,7 +54,7 @@ func NewRouter(c *Config) *Router { varz := &vcap.Varz{ UniqueVarz: router.varz, } - varz.LogCounts = logCounter + varz.LogCounts = log.Counter healthz := &vcap.Healthz{ LockableObject: router.registry, @@ -62,7 +66,6 @@ func NewRouter(c *Config) *Router { Host: host, Credentials: []string{router.config.Status.User, router.config.Status.Pass}, Config: router.config, - Logger: log, Varz: varz, Healthz: healthz, InfoRoutes: map[string]json.Marshaler{ @@ -82,7 +85,7 @@ func (r *Router) RegisterComponent() { type registryMessage struct { Host string `json:"host"` Port uint16 `json:"port"` - Uris Uris `json:"uris"` + Uris route.Uris `json:"uris"` Tags map[string]string `json:"tags"` App string `json:"app"` @@ -96,11 +99,11 @@ func (r *Router) subscribeRegistry(subject string, successCallback func(*registr err := json.Unmarshal(payload, &msg) if err != nil { logMessage := fmt.Sprintf("%s: Error unmarshalling JSON (%d; %s): %s", subject, len(payload), payload, err) - log.Log(steno.LOG_WARN, logMessage, map[string]interface{}{"payload": string(payload)}) + log.Warnd(map[string]interface{}{"payload": string(payload)}, logMessage) } logMessage := fmt.Sprintf("%s: Received message", subject) - log.Log(steno.LOG_DEBUG, logMessage, map[string]interface{}{"message": msg}) + log.Debugd(map[string]interface{}{"message": msg}, logMessage) successCallback(&msg) } @@ -113,7 +116,7 @@ func (r *Router) subscribeRegistry(subject string, successCallback func(*registr func (router *Router) SubscribeRegister() { router.subscribeRegistry("router.register", func(registryMessage *registryMessage) { log.Infof("Got router.register: %v", registryMessage) - router.registry.Register(&RouteEndpoint{ + router.registry.Register(&route.Endpoint{ Host: registryMessage.Host, Port: registryMessage.Port, Uris: registryMessage.Uris, @@ -129,7 +132,7 @@ func (router *Router) SubscribeRegister() { func (r *Router) SubscribeUnregister() { r.subscribeRegistry("router.unregister", func(registryMessage *registryMessage) { log.Infof("Got router.unregister: %v", registryMessage) - r.registry.Unregister(&RouteEndpoint{ + r.registry.Unregister(&route.Endpoint{ Host: registryMessage.Host, Port: registryMessage.Port, Uris: registryMessage.Uris, @@ -284,5 +287,4 @@ func (r *Router) establishMBus() { port := r.config.Nats.Port r.mbusClient.Configure(host, int(port), user, pass) - r.mbusClient.SetLogger(log) } diff --git a/router_test.go b/router_test.go index 8ba1323c3..d302c347c 100644 --- a/router_test.go +++ b/router_test.go @@ -5,21 +5,25 @@ import ( "bytes" "encoding/json" "fmt" - mbus "github.com/cloudfoundry/go_cfmessagebus" - "github.com/cloudfoundry/gorouter/common" - "github.com/cloudfoundry/gorouter/test" "io/ioutil" - . "launchpad.net/gocheck" "net" "net/http" "os/exec" "regexp" "strings" "time" + + mbus "github.com/cloudfoundry/go_cfmessagebus" + . "launchpad.net/gocheck" + + "github.com/cloudfoundry/gorouter/common" + "github.com/cloudfoundry/gorouter/config" + "github.com/cloudfoundry/gorouter/registry" + "github.com/cloudfoundry/gorouter/test" ) type RouterSuite struct { - Config *Config + Config *config.Config natsServerCmd *exec.Cmd mbusClient mbus.MessageBus router *Router @@ -233,7 +237,7 @@ func timeoutDialler() func(net, addr string) (c net.Conn, err error) { } } -func verify_health_z(host string, registry *Registry, c *C) { +func verify_health_z(host string, registry *registry.Registry, c *C) { var req *http.Request var resp *http.Response var err error diff --git a/varz.go b/varz.go index e4bb0e15c..97b2319cd 100644 --- a/varz.go +++ b/varz.go @@ -3,11 +3,14 @@ package router import ( "encoding/json" "fmt" - "github.com/cloudfoundry/gorouter/stats" - metrics "github.com/rcrowley/go-metrics" "net/http" "sync" "time" + + "github.com/cloudfoundry/gorouter/registry" + "github.com/cloudfoundry/gorouter/route" + "github.com/cloudfoundry/gorouter/stats" + metrics "github.com/rcrowley/go-metrics" ) type topAppsEntry struct { @@ -157,17 +160,17 @@ type Varz interface { json.Marshaler CaptureBadRequest(req *http.Request) - CaptureRoutingRequest(b *RouteEndpoint, req *http.Request) - CaptureRoutingResponse(b *RouteEndpoint, res *http.Response, d time.Duration) + CaptureRoutingRequest(b *route.Endpoint, req *http.Request) + CaptureRoutingResponse(b *route.Endpoint, res *http.Response, d time.Duration) } type RealVarz struct { sync.Mutex - r *Registry + r *registry.Registry varz } -func NewVarz(r *Registry) Varz { +func NewVarz(r *registry.Registry) Varz { x := &RealVarz{r: r} x.All = NewHttpMetric() @@ -181,11 +184,11 @@ func (x *RealVarz) MarshalJSON() ([]byte, error) { defer x.Unlock() x.varz.Urls = x.r.NumUris() - x.varz.Droplets = x.r.NumRouteEndpoints() + x.varz.Droplets = x.r.NumEndpoints() x.varz.RequestsPerSec = x.varz.All.Rate.Rate1() millis_per_nano := int64(1000000) - x.varz.MillisSinceLastRegistryUpdate = time.Since(x.r.timeOfLastUpdate).Nanoseconds() / millis_per_nano + x.varz.MillisSinceLastRegistryUpdate = time.Since(x.r.TimeOfLastUpdate()).Nanoseconds() / millis_per_nano x.updateTop() @@ -218,7 +221,7 @@ func (x *RealVarz) CaptureBadRequest(req *http.Request) { x.BadRequests++ } -func (x *RealVarz) CaptureRoutingRequest(b *RouteEndpoint, req *http.Request) { +func (x *RealVarz) CaptureRoutingRequest(b *route.Endpoint, req *http.Request) { x.Lock() defer x.Unlock() @@ -233,14 +236,14 @@ func (x *RealVarz) CaptureRoutingRequest(b *RouteEndpoint, req *http.Request) { x.varz.All.CaptureRequest() } -func (x *RealVarz) CaptureRoutingResponse(routeEndpoint *RouteEndpoint, response *http.Response, duration time.Duration) { +func (x *RealVarz) CaptureRoutingResponse(endpoint *route.Endpoint, response *http.Response, duration time.Duration) { x.Lock() defer x.Unlock() var tags string var ok bool - tags, ok = routeEndpoint.Tags["component"] + tags, ok = endpoint.Tags["component"] if ok { x.varz.Tags.Component.CaptureResponse(tags, response, duration) } diff --git a/varz_test.go b/varz_test.go index 16505d770..f2564dc37 100644 --- a/varz_test.go +++ b/varz_test.go @@ -3,22 +3,26 @@ package router import ( "encoding/json" "fmt" - "github.com/cloudfoundry/go_cfmessagebus/mock_cfmessagebus" - . "launchpad.net/gocheck" "net/http" "time" + + "github.com/cloudfoundry/go_cfmessagebus/mock_cfmessagebus" + . "launchpad.net/gocheck" + + "github.com/cloudfoundry/gorouter/config" + "github.com/cloudfoundry/gorouter/registry" + "github.com/cloudfoundry/gorouter/route" ) type VarzSuite struct { Varz - *Registry + *registry.Registry } var _ = Suite(&VarzSuite{}) func (s *VarzSuite) SetUpTest(c *C) { - mbus := mock_cfmessagebus.NewMockMessageBus() - r := NewRegistry(DefaultConfig(), mbus) + r := registry.NewRegistry(config.DefaultConfig(), mock_cfmessagebus.NewMockMessageBus()) s.Registry = r s.Varz = NewVarz(r) } @@ -95,9 +99,10 @@ func (s *VarzSuite) TestMembersOfUniqueVarz(c *C) { } func (s *VarzSuite) TestSecondsSinceLastRegistryUpdate(c *C) { - testTime := time.Now() + s.Registry.Register(&route.Endpoint{Uris: []route.Uri{route.Uri("foo")}}) + time.Sleep(10 * time.Millisecond) - s.Registry.timeOfLastUpdate = testTime + timeSince := s.findValue("ms_since_last_registry_update").(float64) c.Assert(timeSince < 1000, Equals, true) c.Assert(timeSince >= 10, Equals, true) @@ -106,10 +111,10 @@ func (s *VarzSuite) TestSecondsSinceLastRegistryUpdate(c *C) { func (s *VarzSuite) TestUrlsInVarz(c *C) { c.Check(s.findValue("urls"), Equals, float64(0)) - var fooReg = &RouteEndpoint{ + var fooReg = &route.Endpoint{ Host: "192.168.1.1", Port: 1234, - Uris: []Uri{"foo.vcap.me", "fooo.vcap.me"}, + Uris: []route.Uri{"foo.vcap.me", "fooo.vcap.me"}, Tags: map[string]string{}, ApplicationId: "12345", @@ -132,7 +137,7 @@ func (s *VarzSuite) TestUpdateBadRequests(c *C) { } func (s *VarzSuite) TestUpdateRequests(c *C) { - b := &RouteEndpoint{} + b := &route.Endpoint{} r := http.Request{} s.Varz.CaptureRoutingRequest(b, &r) @@ -143,13 +148,13 @@ func (s *VarzSuite) TestUpdateRequests(c *C) { } func (s *VarzSuite) TestUpdateRequestsWithTags(c *C) { - b1 := &RouteEndpoint{ + b1 := &route.Endpoint{ Tags: map[string]string{ "component": "cc", }, } - b2 := &RouteEndpoint{ + b2 := &route.Endpoint{ Tags: map[string]string{ "component": "cc", }, @@ -165,7 +170,7 @@ func (s *VarzSuite) TestUpdateRequestsWithTags(c *C) { } func (s *VarzSuite) TestUpdateResponse(c *C) { - var b *RouteEndpoint = &RouteEndpoint{} + var b *route.Endpoint = &route.Endpoint{} var d time.Duration r1 := &http.Response{ @@ -187,13 +192,13 @@ func (s *VarzSuite) TestUpdateResponse(c *C) { func (s *VarzSuite) TestUpdateResponseWithTags(c *C) { var d time.Duration - b1 := &RouteEndpoint{ + b1 := &route.Endpoint{ Tags: map[string]string{ "component": "cc", }, } - b2 := &RouteEndpoint{ + b2 := &route.Endpoint{ Tags: map[string]string{ "component": "cc", }, @@ -216,7 +221,7 @@ func (s *VarzSuite) TestUpdateResponseWithTags(c *C) { } func (s *VarzSuite) TestUpdateResponseLatency(c *C) { - var routeEndpoint *RouteEndpoint = &RouteEndpoint{} + var routeEndpoint *route.Endpoint = &route.Endpoint{} var duration = 1 * time.Millisecond response := &http.Response{