Skip to content

Commit

Permalink
feat: add nats readmer
Browse files Browse the repository at this point in the history
  • Loading branch information
franklinkim committed Sep 11, 2023
1 parent 9983e26 commit c964153
Show file tree
Hide file tree
Showing 10 changed files with 129 additions and 41 deletions.
14 changes: 14 additions & 0 deletions interfaces/readmer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,17 @@ package interfaces
type Readmer interface {
Readme() string
}

type ReadmeHandler struct {
Value func() string
}

func (r ReadmeHandler) Readme() string {
return r.Value()
}

func ReadmeFunc(v func() string) ReadmeHandler {
return ReadmeHandler{
Value: v,
}
}
1 change: 1 addition & 0 deletions markdown/markdown.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
markdowntable "github.com/fbiville/markdown-table-formatter/pkg/markdown"
)

// Markdown output helper
type Markdown struct {
value string
}
Expand Down
62 changes: 62 additions & 0 deletions net/stream/jetstream/readme.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package jetstream

import (
"github.com/foomo/keel/markdown"
)

type (
publisher struct {
Namespace string
Stream string
Subject string
}
subscriber struct {
Namespace string
Stream string
Subject string
}
)

var (
publishers []publisher
subscribers []subscriber
)

func Readme() string {
if len(publishers) == 0 && len(subscribers) == 0 {
return ""
}

var rows [][]string
md := &markdown.Markdown{}
md.Println("### NATS")
md.Println("")
md.Println("List of all registered nats publishers & subscribers.")
md.Println("")

if len(publishers) > 0 {
for _, value := range publishers {
rows = append(rows, []string{
markdown.Code(value.Namespace),
markdown.Code(value.Stream),
markdown.Code(value.Subject),
markdown.Code("publish"),
})
}
}

if len(subscribers) > 0 {
for _, value := range subscribers {
rows = append(rows, []string{
markdown.Code(value.Namespace),
markdown.Code(value.Stream),
markdown.Code(value.Subject),
markdown.Code("subscribe"),
})
}
}

md.Table([]string{"Namespace", "Stream", "Subject", "Type"}, rows)

return md.String()
}
29 changes: 29 additions & 0 deletions net/stream/jetstream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package jetstream

import (
"encoding/json"
"slices"
"time"

"github.com/nats-io/nats.go"
Expand Down Expand Up @@ -280,6 +281,20 @@ func (s *Stream) Publisher(subject string, opts ...PublisherOption) *Publisher {
opt(pub)
}
}

{ // append to recoreded publishers
value := publisher{
Stream: s.name,
Namespace: s.namespace,
Subject: subject,
}
if !slices.ContainsFunc(publishers, func(p publisher) bool {
return p.Stream == value.Stream && p.Namespace == value.Namespace && p.Subject == value.Subject
}) {
publishers = append(publishers, value)
}
}

return pub
}

Expand All @@ -295,6 +310,20 @@ func (s *Stream) Subscriber(subject string, opts ...SubscriberOption) *Subscribe
opt(sub)
}
}

{ // append to recoreded publishers
value := subscriber{
Stream: s.name,
Namespace: s.namespace,
Subject: subject,
}
if !slices.ContainsFunc(subscribers, func(p subscriber) bool {
return p.Stream == value.Stream && p.Namespace == value.Namespace && p.Subject == value.Subject
}) {
subscribers = append(subscribers, value)
}
}

return sub
}

Expand Down
5 changes: 0 additions & 5 deletions persistence/mongo/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,6 @@ func CollectionWithIndexesCommitQuorumVotingMembers(v context.Context) Collectio
// ~ Constructor
// ------------------------------------------------------------------------------------------------

var (
dbs = map[string][]string{}
indices = map[string]map[string][]string{}
)

func NewCollection(db *mongo.Database, name string, opts ...CollectionOption) (*Collection, error) {
o := DefaultCollectionOptions()
for _, opt := range opts {
Expand Down
5 changes: 5 additions & 0 deletions persistence/mongo/readme.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ import (
"github.com/foomo/keel/markdown"
)

var (
dbs = map[string][]string{}
indices = map[string]map[string][]string{}
)

func Readme() string {
var rows [][]string
md := &markdown.Markdown{}
Expand Down
18 changes: 8 additions & 10 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/foomo/keel/interfaces"
"github.com/foomo/keel/markdown"
"github.com/foomo/keel/metrics"
keelmongo "github.com/foomo/keel/persistence/mongo"
"github.com/foomo/keel/service"
"github.com/go-logr/logr"
"github.com/pkg/errors"
Expand Down Expand Up @@ -179,7 +178,12 @@ func NewServer(opts ...Option) *Server {

// add probe
inst.AddAlwaysHealthzers(inst)
inst.AddReadmer(inst)
inst.AddReadmers(
interfaces.ReadmeFunc(env.Readme),
interfaces.ReadmeFunc(config.Readme),
inst,
interfaces.ReadmeFunc(metrics.Readme),
)

// start init services
inst.startService(inst.initServices...)
Expand Down Expand Up @@ -258,15 +262,13 @@ func (s *Server) AddClosers(closers ...interface{}) {

// AddReadmer adds a readmer to be added to the exposed readme
func (s *Server) AddReadmer(readmer interfaces.Readmer) {
if !slices.Contains(s.readmers, readmer) {
s.readmers = append(s.readmers, readmer)
}
s.readmers = append(s.readmers, readmer)
}

// AddReadmers adds readmers to be added to the exposed readme
func (s *Server) AddReadmers(readmers ...interfaces.Readmer) {
for _, readmer := range readmers {
s.AddCloser(readmer)
s.AddReadmer(readmer)
}
}

Expand Down Expand Up @@ -355,13 +357,9 @@ func (s *Server) Run() {
func (s *Server) Readme() string {
md := &markdown.Markdown{}

md.Print(env.Readme())
md.Print(config.Readme())
md.Println(s.readmeServices())
md.Println(s.readmeHealthz())
md.Print(s.readmeCloser())
md.Print(keelmongo.Readme())
md.Print(metrics.Readme())

return md.String()
}
Expand Down
10 changes: 8 additions & 2 deletions service/goroutine.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type (
GoRoutineFn func(ctx context.Context, l *zap.Logger) error
)

func NewGoRoutine(l *zap.Logger, name string, handler GoRoutineFn) *GoRoutine {
func NewGoRoutine(l *zap.Logger, name string, handler GoRoutineFn, opts ...GoRoutineOption) *GoRoutine {
if l == nil {
l = log.Logger()
}
Expand All @@ -38,12 +38,18 @@ func NewGoRoutine(l *zap.Logger, name string, handler GoRoutineFn) *GoRoutine {
log.KeelServiceNameKey.String(name),
)

return &GoRoutine{
inst := &GoRoutine{
handler: handler,
name: name,
parallel: 1,
l: l,
}

for _, opt := range opts {
opt(inst)
}

return inst
}

// ------------------------------------------------------------------------------------------------
Expand Down
2 changes: 2 additions & 0 deletions service/goroutine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ func ExampleNewGoRoutine() {
svr.AddService(
service.NewGoRoutine(svr.Logger(), "demo", func(ctx context.Context, l *zap.Logger) error {
for {
// handle graceful shutdowns
if err := ctx.Err(); errors.Is(context.Cause(ctx), service.ErrServiceShutdown) {
l.Info("context has been canceled du to graceful shutdow")
return nil
} else if err != nil {
return errors.Wrap(err, "unexpected context error")
}

l.Info("ping")
time.Sleep(time.Second)
}
Expand Down
24 changes: 0 additions & 24 deletions service/httpreadme_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,10 @@ import (
"io"
"net/http"
"os"
"time"

"github.com/foomo/keel"
"github.com/foomo/keel/config"
"github.com/foomo/keel/env"
"github.com/foomo/keel/examples/persistence/mongo/store"
"github.com/foomo/keel/log"
keelmongo "github.com/foomo/keel/persistence/mongo"
"github.com/foomo/keel/service"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -44,26 +40,6 @@ func ExampleNewHTTPReadme() {
_ = config.MustGetBool(c, "example.required.bool")
_ = config.MustGetString(c, "example.required.string")

// create persistor
persistor, err := keelmongo.New(svr.Context(), "mongodb://localhost:27017/dummy")
log.Must(l, err, "failed to create persistor")

// ensure to add the persistor to the closers
svr.AddClosers(persistor)

// create repositories
_, err = persistor.Collection(
"dummy",
// define indexes but beware of changes on large dbs
keelmongo.CollectionWithIndexes(
store.EntityIndex,
store.EntityWithVersionsIndex,
),
// define max time for index creation
keelmongo.CollectionWithIndexesMaxTime(time.Minute),
)
log.Must(l, err, "failed to create collection")

// add http service
svr.AddService(service.NewHTTP(l, "demp-http", "localhost:8080", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
Expand Down

0 comments on commit c964153

Please sign in to comment.