diff --git a/go/vt/events/eventer/eventer.go b/go/vt/events/eventer/eventer.go index c5a68904200..edefa13b8cc 100644 --- a/go/vt/events/eventer/eventer.go +++ b/go/vt/events/eventer/eventer.go @@ -1,7 +1,9 @@ package eventer import ( + "errors" "sync" + "time" "github.com/spf13/pflag" @@ -18,31 +20,37 @@ func RegisterFlags(fs *pflag.FlagSet) { fs.StringVar(&eventerName, "eventer", eventerName, "the eventer to be used to broadcast internal events") } -type Source string +type Source struct { + Type string + Hostname string +} -var ( - Vtctld Source = "vtctld" - Vtorc Source = "vtorc" -) +type ReparentShardEvent struct { + Source Source + Time time.Time + Reparent *topoevents.Reparent + Error error +} type Eventer interface { - EmergencyReparentShard(src Source, ev *topoevents.Reparent, err error) error - PlannedReparentShard(src Source, ev *topoevents.Reparent, err error) error + Init() error + EmergencyReparentShard(ev *ReparentShardEvent) error + PlannedReparentShard(ev *ReparentShardEvent) error } -func New() Eventer { +func Get() (Eventer, error) { eventersMu.Lock() defer eventersMu.Unlock() - if ev, ok := eventers[eventerName]; ok { - return ev + if eventer, ok := eventers[eventerName]; ok { + return eventer, eventer.Init() } - return nil + return nil, errors.New("no such eventer") } -func RegisterEventer(name string, e Eventer) { +func RegisterEventer(name string, eventer Eventer) { eventersMu.Lock() defer eventersMu.Unlock() - eventers[name] = e + eventers[name] = eventer } func init() { diff --git a/go/vt/events/eventer/log.go b/go/vt/events/eventer/log.go index aa5b913d025..cb8b346d921 100644 --- a/go/vt/events/eventer/log.go +++ b/go/vt/events/eventer/log.go @@ -1,22 +1,21 @@ package eventer -import ( - "vitess.io/vitess/go/vt/log" - topoevents "vitess.io/vitess/go/vt/topotools/events" -) +import "vitess.io/vitess/go/vt/log" type LogEventer struct{} func NewLogEventer() Eventer { return &LogEventer{} } -func (le *LogEventer) EmergencyReparentShard(src Source, ev *topoevents.Reparent, err error) error { - log.Infof("Received EmergencyReparentShardEvent: source=%s, err=%v, event=%v", src, err, ev) +func (le *LogEventer) Init() error { return nil } + +func (le *LogEventer) EmergencyReparentShard(ev *ReparentShardEvent) error { + log.Infof("Received EmergencyReparentShardEvent: %v", ev) return nil } -func (le *LogEventer) PlannedReparentShard(src Source, ev *topoevents.Reparent, err error) error { - log.Infof("Received PlannedReparentShardEvent: source=%s, err=%v, event=%v", src, err, ev) +func (le *LogEventer) PlannedReparentShard(ev *ReparentShardEvent) error { + log.Infof("Received PlannedReparentShardEvent: %v", ev) return nil } diff --git a/go/vt/vtctl/grpcvtctldserver/server.go b/go/vt/vtctl/grpcvtctldserver/server.go index 310b4aa9293..9b6c91dc209 100644 --- a/go/vt/vtctl/grpcvtctldserver/server.go +++ b/go/vt/vtctl/grpcvtctldserver/server.go @@ -89,15 +89,16 @@ type VtctldServer struct { } // NewVtctldServer returns a new VtctldServer for the given topo server. -func NewVtctldServer(ts *topo.Server) *VtctldServer { +func NewVtctldServer(ts *topo.Server) (*VtctldServer, error) { tmc := tmclient.NewTabletManagerClient() + evr, err := eventer.Get() return &VtctldServer{ ts: ts, tmc: tmc, ws: workflow.NewServer(ts, tmc), - evr: eventer.New(), - } + evr: evr, + }, err } func panicHandler(err *error) {