Skip to content

Commit

Permalink
Add listen events command (#171)
Browse files Browse the repository at this point in the history
Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia authored Jul 27, 2023
1 parent 9986cdd commit d662844
Show file tree
Hide file tree
Showing 8 changed files with 718 additions and 85 deletions.
290 changes: 290 additions & 0 deletions eventlog/event_log.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 26 additions & 0 deletions eventlog/event_log.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
syntax = "proto3";
package milvus.proto.eventlog;

option go_package = "github.com/milvus-io/birdwatcher/eventlog";

service EventLogService {
rpc Listen(ListenRequest) returns(stream Event) {}
}

message ListenRequest {
}

message Event {
Level level = 1;
int32 type = 2;
bytes data = 3;
int64 ts = 4;
}

enum Level {
Undefined = 0;
Debug = 1;
Info = 2;
Warn = 3;
Error = 4;
}
67 changes: 67 additions & 0 deletions eventlog/listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package eventlog

import (
"context"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

type Listener struct {
conn *grpc.ClientConn
client EventLogServiceClient
stream EventLogService_ListenClient
closed chan struct{}
}

func NewListener(ctx context.Context, addr string) (*Listener, error) {
opts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
grpc.WithTimeout(time.Second),
}

conn, err := grpc.DialContext(ctx, addr, opts...)
if err != nil {
return nil, err
}

client := NewEventLogServiceClient(conn)
return &Listener{
conn: conn,
client: client,
}, nil
}

func (l *Listener) Start(ctx context.Context) (<-chan *Event, error) {
ch := make(chan *Event, 100)
s, err := l.client.Listen(ctx, &ListenRequest{})
if err != nil {
return nil, err
}

go func() {
defer close(ch)
for {
evt, err := s.Recv()
if err != nil {
return
}
select {
case ch <- evt:
case <-l.closed:
return
}
}
}()

return ch, nil
}

func (l *Listener) Stop() {
close(l.closed)
if l.stream != nil {
l.stream.CloseSend()
}
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/c-bata/go-prompt v0.2.6
github.com/cockroachdb/errors v1.9.1
github.com/confluentinc/confluent-kafka-go v1.9.1
github.com/fatih/color v1.7.0
github.com/gin-gonic/gin v1.9.1
github.com/golang/protobuf v1.5.2
github.com/gosuri/uilive v0.0.4
Expand Down
Loading

0 comments on commit d662844

Please sign in to comment.